Showing posts with label Thread. Show all posts
Showing posts with label Thread. Show all posts

Producer-Consumer Problem in Java | Java Thread interview question for Experinced developer

The producer-consumer problem is one of the most frequently encountered problems when we attempt multi threaded programming. While not as challenging as some of the other problems in multi-threaded programming, an incorrect implementation of this problem can create a mess of your application. Produced items will be left unconsumed, starting items will be skipped, consumption depends on whether the production began earlier or later than consumption attempts etc. To add to this you might notice the anomalies long after it has actually happened and most importantly like almost all multi-threaded programs, this one is hard to debug and reproduce too.
So in this post I thought I would attempt to solve this problem in Java with the help of Java' awesome java.util.concurrent package and its classes.
First of all, let us see the characteristics of the producer-consumer problem:
  • Producer(s) produce items.
  • Consumer(s) consume the items produced by the producer(s).
  • Producer(s) finish production and let the consumers know that they are done.
Note that in this producer-consumer problem the producer is running on a different thread than the ones on consumer. This setup makes sense in two cases:
  • The steps to do the consumption of the item produced in independent and not dependent on other items.
  • The time to process the items is larger that the time to produce them.
The term "larger" in the second point is used a bit loosely. Consider the case where producer reads a line from a file and the "consumption and processing" is just to log the line in a special format back to a file then the use of a producer consumer problem solution can be considered a case of over-engineering a solution. However if for each of those lines the "consumption and processing" step is to make a HTTP GET/POST request to a web-server and then dump the result somewhere then we should opt for a producer-consumer solution. In this case I am assuming that all the data to do a GET/POST is available in the line (item) itself and we are not dependent on previous/next lines.
So let us first take a look at the characteristics of the producer-consumer problem solution that I have posted below:
  • There can be multiple producer.
  • There will be multiple consumers.
  • Once the production of new items is done the producer(s) will let the consumers know so that the consumer will exit after the last item is consumed and processed.
It is interesting to note that to solve this problem at a generic level we can address only the consumer side and not the producer side. This is because the production of items might be done at any time and there is very little that we can do in a generic way to control the production of items. We can, however control the consumer's behaviour while accepting items from producer(s). Having laid out the rules let us take a look at the consumer contract:


  1. package com.maximus.producerconsumer;  
  2.   
  3. public interface Consumer  
  4. {  
  5.  public boolean consume(Item j);  
  6.    
  7.  public void finishConsumption();  
  8. }   

Here the consumer can be shared between multiple producers of similar items; by similar items I mean producer that produces objects of type "Item". The definition if Item is as follows:


  1. package com.maximus.consumer;  
  2.   
  3. public interface Item  
  4. {  
  5.  public void process();  
  6. }  
Now we take a look at an implementation of the Consumer interface:

  1. package com.maximus.consumer;  
  2.   
  3. import java.util.LinkedList;  
  4. import java.util.List;  
  5. import java.util.concurrent.BlockingQueue;  
  6. import java.util.concurrent.ExecutorService;  
  7. import java.util.concurrent.Executors;  
  8. import java.util.concurrent.LinkedBlockingQueue;  
  9.   
  10. public class ConsumerImpl implements Consumer  
  11. {  
  12.  private BlockingQueue< Item > itemQueue =   
  13.   new LinkedBlockingQueue< Item >();  
  14.    
  15.  private ExecutorService executorService =   
  16.   Executors.newCachedThreadPool();  
  17.    
  18.  private List< ItemProcessor > jobList =   
  19.   new LinkedList< ItemProcessor >();  
  20.    
  21.  private volatile boolean shutdownCalled = false;  
  22.     
  23.  public ConsumerImpl(int poolSize)  
  24.  {  
  25.   for(int i = 0; i < poolSize; i++)  
  26.   {  
  27.    ItemProcessor jobThread =   
  28.     new ItemProcessor(itemQueue);  
  29.      
  30.    jobList.add(jobThread);  
  31.    executorService.submit(jobThread);  
  32.   }  
  33.  }  
  34.    
  35.  public boolean consume(Item j)  
  36.  {  
  37.   if(!shutdownCalled)  
  38.   {  
  39.    try  
  40.    {  
  41.     itemQueue.put(j);  
  42.    }  
  43.    catch(InterruptedException ie)  
  44.    {  
  45.     Thread.currentThread().interrupt();  
  46.     return false;  
  47.    }  
  48.    return true;  
  49.   }  
  50.   else  
  51.   {  
  52.    return false;  
  53.   }  
  54.  }  
  55.    
  56.  public void finishConsumption()  
  57.  {  
  58.   shutdownCalled = true;  
  59.     
  60.   for(ItemProcessor j : jobList)  
  61.   {  
  62.    j.cancelExecution();  
  63.   }  
  64.     
  65.   executorService.shutdown();  
  66.  }  
  67. }  

Now the only point of interest is the ItemProcessor that the consumer internally uses to process the incoming items. ItemProcessor is coded as follows:

  1. package com.maximus.consumer;  
  2.   
  3. import java.util.concurrent.BlockingQueue;  
  4. import java.util.concurrent.TimeUnit;  
  5.   
  6. public class ItemProcessor implements Runnable  
  7. {  
  8.  private BlockingQueue< Item> jobQueue;  
  9.    
  10.  private volatile boolean keepProcessing;  
  11.     
  12.  public ItemProcessor(BlockingQueue< Item > queue)  
  13.  {  
  14.   jobQueue = queue;  
  15.   keepProcessing = true;  
  16.  }  
  17.    
  18.  public void run()  
  19.  {  
  20.   while(keepProcessing || !jobQueue.isEmpty())  
  21.   {  
  22.    try  
  23.    {  
  24.     Item j = jobQueue.poll(10, TimeUnit.SECONDS);  
  25.       
  26.     if(j != null)  
  27.     {  
  28.      j.process();  
  29.     }  
  30.    }  
  31.    catch(InterruptedException ie)  
  32.    {  
  33.     Thread.currentThread().interrupt();  
  34.     return;  
  35.    }  
  36.   }  
  37.  }  
  38.    
  39.  public void cancelExecution()  
  40.  {  
  41.   this.keepProcessing = false;  
  42.  }  
  43. }   
The only challenge above is the condition in the while loop. The while loop is so written to support the continuation of the consumption of items even after the producer(s) have finished production and has notified the consumer that production is finished. The above while loop ensures that consumption of all the items is done before the threads exit.This will be the case when producers run faster that consumers.

The above consumer is thread-safe and can be shared multiple producers such that each producer may concurrently call consumer.consume() without bothering about synchronization and other multi-threading caveats. Producers just need to submit an implementation of the Item interface whose process() method will contain the logic of how the consumption will be done.

As a bonus for reading the post I put forward a test program that demonstrates how to use the above classes:



  1. package com.maximus.consumer;  
  2.   
  3. import java.io.BufferedReader;  
  4. import java.io.File;  
  5. import java.io.FileInputStream;  
  6. import java.io.InputStreamReader;  
  7.   
  8. public class Test  
  9. {  
  10.  public static void main(String[] args) throws Exception  
  11.         {  
  12.          Consumer consumer = new ConsumerImpl(10);  
  13.            
  14.          BufferedReader br =   
  15.           new BufferedReader(  
  16.           new InputStreamReader(  
  17.           new FileInputStream(  
  18.           new File(args[0]))));  
  19.            
  20.          String line = "";  
  21.            
  22.          while((line = br.readLine()) != null)  
  23.          {  
  24.           System.out.println(  
  25.            "Producer producing: " + line);  
  26.           consumer.consume(new PrintJob(line));  
  27.          }  
  28.            
  29.          consumer.finishConsumption();  
  30.         }  
  31. }  
  32.   
  33. class PrintJob implements Item  
  34. {  
  35.  private String line;  
  36.    
  37.  public PrintJob(String s)  
  38.  {  
  39.   line = s;  
  40.  }  
  41.    
  42.  public void process()  
  43.  {  
  44.   System.out.println(  
  45.    Thread.currentThread().getName() +   
  46.    " consuming :" + line);  
  47.  }  
  48. }  
The above consumer can be tweaked in a host of different ways to make it more flexible. We can define what the consumer will do when production is done. It may be tweaked to allow batch processing but I leave that to the user. Feel free to use it and twist it in whatever way you want.

Thread.State in Java? BLOCKED vs WAITING | What is Thread.State in Java? What's it used for?

Thread.State - This is a static nested class (Read more about nested classes in the article - Nested Classes & Inner Classes in Java >>) of the Thread class. This is one of the additions of Java 5 and this class actually inherits the abstract class Enum which is the common base class of all Java language enumeration types i.e., Thread.State is actually is actually an enumeration type.

Thread.State enumeration contains the possible states of a Java thread in the underlying JVM. These states are different from the Operating System thread states. The possible values of the Thread.State are:-

    NEW - this state represents a new thread which is not yet started.
    RUNNABLE - this state represents a thread which is executing in the underlying JVM. Here executing in JVM doesn't mean that the thread is always executing in the OS as well - it may wait for a resource from the Operating system like the processor while being in this state.
    BLOCKED - this state represents a thread which has been blocked and is waiting for a moniotor to enter/re-enter a synchronized block/method. A thread gets into this state after calling Object.wait method.
    WAITING - this state represnts a thread in the waiting state and this wait is over only when some other thread performs some appropriate action. A thread can get into this state either by calling - Object.wait (without timeout), Thread.join (without timeout), or LockSupport.park methods.
    TIMED_WAITING - this state represents a thread which is required to wait at max for a specified time limit. A thread can get into this state by calling either of these methods: Thread.sleep, Object.wait (with timeout specified), Thread.join (with timeout specified), LockSupport.parkNanos, LockSupport.parkUntil
    TERMINATED - this state reprents a thread which has completed its execution either by returning from the run() method after completing the execution OR by throwing an exception which propagated from the run() method and hence caused the termination of the thread.

Difference between BLOCKED state and WAITING / TIMED_WAITING states?


When a thread calls Object.wait method, it releases all the acquired monitors and is put into WAITING (or TIMED_WAITING if we call the timeout versions of the wait method) state. Now when the thread is notified either by notify() or by notifyAll() call on the same object then the waiting state of the thread ends and the thread starts attempting to regain all the monitors which it had acquired at the time of wait call. At one time there may be several threads trying to regain (or maybe gain for the first time) their monitors. If more than one threads attempt to acquire the monitor of a particular object then only one thread (selected by the JVM scheduler) is granted the monitor and all other threads are put into BLOCKED state. Got the difference?

Difference between WAITING and TIMED_WAITING states?

The difference is quite obvious between the two. A thread in a TIMED_WAITING state will wait at max for the specified timeout period whereas a thread in the WAITING state keeps waiting for an indefinite period of time. For example, if a thread has called Object.wait method to put itself into WAITING state then it'll keep waiting until the thread is interrupted either by notify() method (OR by notifyAll() method) call on the same object by another thread. Similarly, if a thread has put itself into WAITING state by calling Thread.join method then it'll keep waiting until the specified thread terminates.

We can easily figure out that a thread in a WAITING state will always be dependent on an action performed by some other thread whereas a thread in TIMED_WAITING is not completely dependent on an action performed by some other thread as in this case the wait ends automatically after the completion of the timeout period.