Wednesday, January 26, 2022

How to Solve Producer Consumer Problem in Java using BlockingQueue [Example]

The Producer-Consumer problem is one of the classic multi-threading problems in computer science and the multi-threading world. It's tricky because it involves inter-thread communication, but it's important because most of the multi-threading problems fit into this category. Because of its importance, it's also known as Producer Consumer design patterns. There are many ways to solve the producer-consumer problem in Java, like you can solve this by using the wait() and notify() method, as discussed here, or you can use the Semaphore to solve this problem. 

In this article, you will learn a third way to solve the producer-consumer problem by using BlockingQueue in Java. It is arguably the simplest way to solve this problem in any programming language because blocking queue data structure not only provides storage but also provides flow control and thread safety, which makes the code really simple. 

Brian Goetz has also explained this key class and pattern in his classic Java Concurrency in the Practice book, a must-read for serious Java developers.




Producer-Consumer Pattern using BlockingQueue

Java provides a built-in blocking queue data structure in java.utill.concurrent package. It was added on JDK with multiple concurrent utilities e.g. CountDownLatch, CyclicBarrier, and Callable and Future classes.

The java.util.concurrent.BlockingQueue is an interface and comes with two ready-made implementations then ArrayLinkedBlockingQueue and LinkedBlockingQueue. As the name suggests, one is backed by an array while the other is backed by a linked list.

In order to solve the producer-consumer problem, we will create two threads that will simulate producer and consumer, and instead of the shared object, we will use the shared BlockingQueue. Our code will be simple, the producer will add an element into the queue and the consumer will remove the element.

BlockingQueue provides a put() method to store the element and a take() method to retrieve the element. Both are blocking methods, which means put() will block if the queue has reached its capacity and there is no place to add a new element. 

Similarly, the take() method will block if the blocking queue is empty. So, you can see that the critical requirement of the producer-consumer pattern is met right there, you don't need to put any thread synchronization code.




Producer-Consumer Example in Java using BlockingQueue

Here is our sample Java program to solve the classical producer-consumer problem using BlockingQueue in Java:
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;

/**
 * Producer Consumer Problem solution using BlockingQueue in Java.
 * BlockingQueue not only provide a data structure to store data
 * but also gives you flow control, require for inter thread communication.
 * 
 * @author Javin Paul
 */
public class ProducerConsumerSolution {

    public static void main(String[] args) {
        BlockingQueue<Integer> sharedQ = new LinkedBlockingQueue<Integer>();
        
        Producer p = new Producer(sharedQ);
        Consumer c = new Consumer(sharedQ);
        
        p.start();
        c.start();
    }
}

class Producer extends Thread {
    private BlockingQueue<Integer> sharedQueue;

    public Producer(BlockingQueue<Integer> aQueue) {
        super("PRODUCER");
        this.sharedQueue = aQueue;
    }

    public void run() {
        // no synchronization needed
        for (int i = 0; i < 10; i++) {
            try {
                System.out.println(getName() + " produced " + i);
                sharedQueue.put(i);
                Thread.sleep(200);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }

        }
    }
}

class Consumer extends Thread {
    private BlockingQueue<Integer> sharedQueue;

    public Consumer(BlockingQueue<Integer> aQueue) {
        super("CONSUMER");
        this.sharedQueue = aQueue;
    }

    public void run() {
        try {
            while (true) {
                Integer item = sharedQueue.take();
                System.out.println(getName() + " consumed " + item);
            }
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

Output
PRODUCER produced 0
CONSUMER consumed 0
PRODUCER produced 1
CONSUMER consumed 1
PRODUCER produced 2
CONSUMER consumed 2
PRODUCER produced 3
CONSUMER consumed 3
PRODUCER produced 4
CONSUMER consumed 4
PRODUCER produced 5
CONSUMER consumed 5
PRODUCER produced 6
CONSUMER consumed 6
PRODUCER produced 7
CONSUMER consumed 7
PRODUCER produced 8
CONSUMER consumed 8
PRODUCER produced 9
CONSUMER consumed 9



Explanation of code

If you look at the above code example, you will find that we have created and started two threads and named them Producer and Consumer. The Producer thread executes the code inside the run() method, which adds 10 Integer objects starting from 0.

After adding each element, the Producer thread is sleeping for 200 milliseconds by calling Thread.sleep() method. This gives time to the Consumer thread to consume elements from Queue, that's why our code never blocks.

You can see that our Producer and Consumer threads are working in sync because of Thread.sleep() we have introduced after the put() call. You can further experiment with the code by removing the code to pause the Producer thread or inserting pause on the Consumer thread to create scenarios where Queue is full or empty.


Benefits of using BlockingQueue to solve Producer Consumer
  • Simple code, much more readable
  • less error-prone as you don't have to deal with any external synchronization

Producer Consumer Example using BlockingQueue in Java



That's all about how to solve the producer-consumer problem using BlockingQueue in Java. In production code, you should always use BlockingQueue, using wait() and notify() is neither easy nor desirable given you have better tools available.

Other Java Multithreading Articles you may like
If you like this tutorial and are hungry to learn more about the thread, synchronization, and multi-threading then check out the following articles as well:
  • Difference between notify() and notifyAll() in Java? (answer)
  • The difference between synchronized blocks and methods in Java? (answer)
  • How to do inter-thread communication in Java using wait-notify? (answer)
  • Difference between extends Thread vs implements Runnable in Java? (answer)
  • When to use the volatile variable in Java? (answer)
  • 5 Courses to Learn Java Multithreading in-depth (courses)
  • 10 Java Multithreading and Concurrency Best Practices (article)
  • How to avoid deadlock in Java? (answer)
  • Understanding the flow of data and code in Java program (answer)
  • How to do inter-thread communication in Java using wait-notify? (answer)
  • 10 Tips to become a better Java Developer (tips)
  • The difference between Callable and Runnable in Java? (answer)
  • Difference between volatile, synchronized, and atomic (answer)
  • How to pause a Thread in Java? (solution)
  • Top 50 Multithreading and Concurrency Questions in Java (questions)
  • Top 5 Books to Master Concurrency in Java (books)
  • Executor, Executors, and ExecutorService in Java (answer)
  • ForkJoinPool and Executor Framework in Java(answer)
  • How to join two threads in Java? (answer)
  • What is Happens Before in Java Concurrency? (answer)

Thanks for reading this article so far. If you found this tutorial useful then please share it with your friends and colleagues. If you have any questions or feedback then please drop a note. 

P. S. - Even Joshua Bloch has advised in Effective Java to prefer higher concurrency utilities and libraries instead of writing your own code. Remember, the code is only written once but read numerous times for maintenance, troubleshooting, and support purposes.

9 comments:

  1. So Blocking Queues internally implement wait and notify methods ?

    ReplyDelete
  2. yes.internally blocking queue implement wait and notify methods in un ordered way.

    ReplyDelete
  3. Why is Thread.sleep() necessary here? If you are using a BlockingQueue, it is synchronized by default, and therefore won't block...

    ReplyDelete
  4. How can I set consumer and producer conditions to let say put 20 integers in the que and the consumer takes out a random number between 1 and 4 out of the queue

    ReplyDelete
    Replies
    1. Instead of picking random number its better that you put random number and consumer thread picks from one end. This would result in same effect but simpler code.

      Delete
  5. there are many loopholes in the program above, like what happens in case of exception, in
    consumer while loop is set to true without any condition, this code will never come back to
    main thread, in case of any producer/consumer fails, the program will run in infinite loop...

    ReplyDelete
    Replies
    1. Hello Anonymous, the purpose of this program is to explain producer consumer design pattern and how it work. How one thread produces data and how consumer takes it. For production usage, I recommend using library classes as they are tested for many different cases including the ones you have mentioned. Of course you can also handle those here but its not worth it, you never get the amount of testing coverage libraries enjoys, so better use them on production.

      Delete
    2. import java.util.concurrent.ArrayBlockingQueue;
      import java.util.concurrent.BlockingQueue;

      public class consumeprodue {
      private static BlockingQueue queue = new ArrayBlockingQueue<>(10);
      public static void main(String[] args) {
      Thread p = new Thread() {
      public void run() {
      for (int i = 0; i < 10; i++) {
      try {
      System.out.println(getName() + " produced :" + i);
      queue.put(i);
      Thread.sleep(200);
      } catch (InterruptedException e) {
      e.printStackTrace();
      }
      }

      }

      };
      Thread c = new Thread() {

      public void run() {

      try {
      while (true) {
      System.out.println(getName() + "consume :" + queue.take());
      }

      } catch (InterruptedException e) {

      e.printStackTrace();
      }
      }

      };

      p.start();
      c.start();

      }

      }

      Delete
  6. import java.util.concurrent.ArrayBlockingQueue;
    import java.util.concurrent.BlockingQueue;

    public class consumeprodue {
    private static BlockingQueue queue = new ArrayBlockingQueue<>(10);
    public static void main(String[] args) {
    Thread p = new Thread() {
    public void run() {
    for (int i = 0; i < 10; i++) {
    try {
    System.out.println(getName() + " produced :" + i);
    queue.put(i);
    Thread.sleep(200);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }

    }

    };
    Thread c = new Thread() {

    public void run() {

    try {
    while (true) {
    System.out.println(getName() + "consume :" + queue.take());
    }

    } catch (InterruptedException e) {

    e.printStackTrace();
    }
    }

    };

    p.start();
    c.start();

    }

    }

    ReplyDelete

Feel free to comment, ask questions if you have any doubt.