The producer-consumer problem is one of the most frequently encountered
problems when we attempt multi threaded programming. While not as
challenging as some of the other problems in multi-threaded programming,
an incorrect implementation of this problem can create a mess of your
application. Produced items will be left unconsumed, starting items will
be skipped, consumption depends on whether the production began earlier
or later than consumption attempts etc. To add to this you might notice
the anomalies long after it has actually happened and most importantly
like almost all multi-threaded programs, this one is hard to debug and
reproduce too.
So in this post I thought I would attempt to solve this problem in Java
with the help of Java' awesome java.util.concurrent package and its
classes.
First of all, let us see the characteristics
of the producer-consumer problem:
- Producer(s) produce items.
- Consumer(s) consume the items produced by the producer(s).
- Producer(s) finish production and let the consumers know that they are done.
Note that in this producer-consumer problem the producer is running on a
different thread than the ones on consumer. This setup makes sense in
two cases:
- The steps to do the consumption of the item produced in independent and not dependent on other items.
- The time to process the items is larger that the time to produce them.
The term "larger" in the second point is used a bit loosely. Consider
the case where producer reads a line from a file and the "consumption
and processing" is just to log the line in a special format back to a
file then the use of a producer consumer problem solution can be
considered a case of over-engineering a solution. However if for each of
those lines the "consumption and processing" step is to make a HTTP
GET/POST request to a web-server and then dump the result somewhere then
we should opt for a producer-consumer solution. In this case I am
assuming that all the data to do a GET/POST is available in the line
(item) itself and we are not dependent on previous/next lines.
So let us first take a look at the characteristics of the producer-consumer problem solution that I have posted below:
- There can be multiple producer.
- There will be multiple consumers.
- Once the production of new items is done the producer(s) will let the consumers know so that the consumer will exit after the last item is consumed and processed.
It is interesting to note that to solve this problem at a generic level
we can address only the consumer side and not the producer side. This is
because the production of items might be done at any time and there is
very little that we can do in a generic way to control the production of
items. We can, however control the consumer's behaviour while accepting
items from producer(s). Having laid out the rules let us take a look at
the consumer contract:
- package com.maximus.producerconsumer;
- public interface Consumer
- {
- public boolean consume(Item j);
- public void finishConsumption();
- }
Here the consumer can be shared between multiple producers of similar
items; by similar items I mean producer that produces objects of type
"Item". The definition if Item is as follows:
- package com.maximus.consumer;
- public interface Item
- {
- public void process();
- }
Now we take a look at an implementation of the Consumer interface:
- package com.maximus.consumer;
- import java.util.LinkedList;
- import java.util.List;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.ExecutorService;
- import java.util.concurrent.Executors;
- import java.util.concurrent.LinkedBlockingQueue;
- public class ConsumerImpl implements Consumer
- {
- private BlockingQueue< Item > itemQueue =
- new LinkedBlockingQueue< Item >();
- private ExecutorService executorService =
- Executors.newCachedThreadPool();
- private List< ItemProcessor > jobList =
- new LinkedList< ItemProcessor >();
- private volatile boolean shutdownCalled = false;
- public ConsumerImpl(int poolSize)
- {
- for(int i = 0; i < poolSize; i++)
- {
- ItemProcessor jobThread =
- new ItemProcessor(itemQueue);
- jobList.add(jobThread);
- executorService.submit(jobThread);
- }
- }
- public boolean consume(Item j)
- {
- if(!shutdownCalled)
- {
- try
- {
- itemQueue.put(j);
- }
- catch(InterruptedException ie)
- {
- Thread.currentThread().interrupt();
- return false;
- }
- return true;
- }
- else
- {
- return false;
- }
- }
- public void finishConsumption()
- {
- shutdownCalled = true;
- for(ItemProcessor j : jobList)
- {
- j.cancelExecution();
- }
- executorService.shutdown();
- }
- }
Now the only point of interest is the ItemProcessor that the consumer
internally uses to process the incoming items. ItemProcessor is coded as
follows:
- package com.maximus.consumer;
- import java.util.concurrent.BlockingQueue;
- import java.util.concurrent.TimeUnit;
- public class ItemProcessor implements Runnable
- {
- private BlockingQueue< Item> jobQueue;
- private volatile boolean keepProcessing;
- public ItemProcessor(BlockingQueue< Item > queue)
- {
- jobQueue = queue;
- keepProcessing = true;
- }
- public void run()
- {
- while(keepProcessing || !jobQueue.isEmpty())
- {
- try
- {
- Item j = jobQueue.poll(10, TimeUnit.SECONDS);
- if(j != null)
- {
- j.process();
- }
- }
- catch(InterruptedException ie)
- {
- Thread.currentThread().interrupt();
- return;
- }
- }
- }
- public void cancelExecution()
- {
- this.keepProcessing = false;
- }
- }
The only challenge above is the condition in the while loop. The while
loop is so written to support the continuation of the consumption of
items even after the producer(s) have finished production and has
notified the consumer that production is finished. The above while loop
ensures that consumption of all the items is done before the threads
exit.This will be the case when producers run faster that consumers.
The above consumer is thread-safe and can be shared multiple producers
such that each producer may concurrently call consumer.consume() without
bothering about synchronization and other multi-threading caveats.
Producers just need to submit an implementation of the Item interface
whose process() method will contain the logic of how the consumption
will be done.
As a bonus for reading the post I put forward a test program that demonstrates how to use the above classes:
- package com.maximus.consumer;
- import java.io.BufferedReader;
- import java.io.File;
- import java.io.FileInputStream;
- import java.io.InputStreamReader;
- public class Test
- {
- public static void main(String[] args) throws Exception
- {
- Consumer consumer = new ConsumerImpl(10);
- BufferedReader br =
- new BufferedReader(
- new InputStreamReader(
- new FileInputStream(
- new File(args[0]))));
- String line = "";
- while((line = br.readLine()) != null)
- {
- System.out.println(
- "Producer producing: " + line);
- consumer.consume(new PrintJob(line));
- }
- consumer.finishConsumption();
- }
- }
- class PrintJob implements Item
- {
- private String line;
- public PrintJob(String s)
- {
- line = s;
- }
- public void process()
- {
- System.out.println(
- Thread.currentThread().getName() +
- " consuming :" + line);
- }
- }
The above consumer can be tweaked in a host of different ways to make it
more flexible. We can define what the consumer will do when production
is done. It may be tweaked to allow batch processing but I leave that to
the user. Feel free to use it and twist it in whatever way you want.