05 How Many Ways to Implement Producer Consumer Patterns

05 How Many Ways to Implement Producer-Consumer Patterns #

In this lesson, we mainly learn how to use wait/notify/Condition/BlockingQueue to implement the producer-consumer pattern.

Producer-Consumer Pattern #

Let’s first take a look at what the producer-consumer pattern is. The producer-consumer pattern is a very common design pattern in program design and is widely used in decoupling, message queues, and other scenarios. In the real world, we call the side that produces goods the producer and the side that consumes goods the consumer. Sometimes the producer produces goods at a very fast speed, but the consumer cannot keep up with the consumption speed, which is commonly known as “overcapacity”. Or when there are multiple producers corresponding to multiple consumers, everyone may be in a hurry. How can we make everyone cooperate better? At this time, an intermediary is needed between the producer and the consumer to schedule. Thus, the producer-consumer pattern was born.

img

Using the producer-consumer pattern usually requires adding a blocking queue as a medium between the two, creating a buffer. The overall design is shown in the diagram above. The top is the blocking queue, 1 on the right side represents the producer thread, which puts data into the blocking queue after producing data, and 2 on the left side represents the consumer thread, which gets data from the blocking queue. Numbers 3 and 4 in the middle represent the communication process between the producer and the consumer. Because the blocking queue can be blocked whether it is full or empty, the blocked thread needs to be awakened at the appropriate time.

So when should a blocked thread be awakened? There are two situations. The first situation is when the consumer sees that the blocking queue is empty and starts to wait. At this time, once the producer puts data into the queue, it will notify all consumers and wake up the blocked consumer thread. The other situation is when the producer finds that the queue is already full and is blocked. Once the consumer gets the data, it means that there is one empty position in the queue, and the consumer will notify all the blocked producers to produce. This is a brief introduction to the producer-consumer pattern.

How to Use BlockingQueue to Implement the Producer-Consumer Pattern #

Next, let’s see how to use wait/notify/Condition/BlockingQueue to implement the producer-consumer pattern. Let’s start with the simplest BlockingQueue:

public static void main(String[] args) {

  BlockingQueue<Object> queue = new ArrayBlockingQueue<>(10);

  Runnable producer = () -> {

    while (true) {

          queue.put(new Object());

  }

   };

new Thread(producer).start();

new Thread(producer).start();

Runnable consumer = () -> {

      while (true) {

           queue.take();

}

   };

new Thread(consumer).start();

new Thread(consumer).start();

}

As shown in the code, first, create a BlockingQueue of type ArrayBlockingQueue named queue and set its capacity to 10. Then, create a simple producer, where the queue.put() in the while(true) loop is responsible for adding data to the queue. Then, create two producer threads and start them. Similarly, the consumer is also very simple, where the queue.take() in the while(true) loop is responsible for consuming data, and two consumer threads are created and started. In order to keep the code concise and highlight the design ideas, the code omits try/catch checks, so we won’t dwell on some syntax details. This is the code for implementing the producer-consumer pattern using BlockingQueue. Although the code is very simple, the ArrayBlockingQueue has done a lot of work behind the scenes, such as blocking the producer thread when the queue is full, and waking up the producer thread when the queue is empty.

How to Use Condition to Implement the Producer-Consumer Pattern #

Implementing the producer-consumer pattern using BlockingQueue seems simple, but there are hidden mysteries behind it. On the basis of mastering this method, we still need to master more complex implementation methods. Next, let’s see how to use Condition to implement the producer-consumer pattern based on understanding BlockingQueue. They have very similar implementation principles, which is equivalent to implementing a simplified version of BlockingQueue ourselves:

public class MyBlockingQueueForCondition {

   private Queue queue;

   private int max = 16;

   private ReentrantLock lock = new ReentrantLock();

   private Condition notEmpty = lock.newCondition();

Please note that some parts of the text have been omitted in the response.

private final Object lock = new Object();
private List<Object> queue = new LinkedList<>();
private final int max;

public MyBlockingQueue(int size) {
    this.max = size;
}

public void put(Object o) throws InterruptedException {
    synchronized (lock) {
        while (queue.size() == max) {
            lock.wait();
        }
        queue.add(o);
        lock.notifyAll();
    }
}

public Object take() throws InterruptedException {
    synchronized (lock) {
        while (queue.size() == 0) {
            lock.wait();
        }
        Object item = queue.remove(0);
        lock.notifyAll();
        return item;
    }
}

As shown in the code, we first define a queue variable queue with a maximum capacity of 16. Then, we define a Lock object of type ReentrantLock, and create two Conditions based on this lock: notEmpty which represents the condition when the queue is not empty, and notFull which represents the condition when the queue is not full. Finally, we declare two core methods: put and take.

Because the producer-consumer pattern usually involves multiple threads, it requires synchronization measures to ensure thread safety. In the put method, we first lock the lock using synchronized (lock). Then, we check whether the queue is full using a while loop. If it is full, we call lock.wait() to block the producer thread and release the lock. If it is not full, we add data to the queue and use lock.notifyAll() to notify all waiting consumers and wake them up. Finally, we unlock the lock using synchronized (lock) in the finally block. Putting the unlock operation in the finally block is a basic principle to ensure that the lock is released, otherwise it may lead to a situation where the lock cannot be released.

Next, let’s take a look at the take method. The take method is actually the counterpart of the put method. Similarly, we use a while loop to check whether the queue is empty. If it is empty, the consumer starts to wait. If it is not empty, we retrieve data from the queue and notify the producer that there is space available in the queue. Finally, we unlock the lock.

It is worth noting that we use a while loop to check the queue state in the take method, rather than an if statement. Why is that? Let’s consider the following situation: because producers and consumers are usually in a multi-threaded environment, let’s assume we have two consumer threads. When the first consumer thread tries to retrieve data and finds that the queue is empty, it enters a waiting state. Because the first thread releases the lock while waiting, the second consumer thread can enter and execute if (queue.size() == 0). The second thread also finds that the queue is empty, so it enters a waiting state. At this point, if the producer produces data, both consumer threads will be awakened. However, only one thread can acquire the lock and execute the queue.remove operation, while the other thread will be stuck at the point where it was awakened because it did not acquire the lock. When the first thread completes its operation and unlocks the lock in the finally block, the second thread can acquire the lock and continue executing the operation. However, at this time, the queue is already empty, so it will throw a NoSuchElementException exception, which does not meet our logic. On the other hand, if we use a while loop for checking, when the first consumer thread is awakened and acquires the lock to remove data, the second thread will still perform the while check before executing the await method. If it still satisfies the condition queue.size() == 0, it will continue to execute the await method, avoiding the situation where it retrieves null data or throws an exception.

How to Implement the Producer-Consumer Pattern Using wait and notify #

Lastly, let’s take a look at the method of using wait/notify to implement the producer-consumer pattern. In fact, the implementation principle is very similar to Condition, and they are brotherly.

class MyBlockingQueue {

   private int maxSize;
   private LinkedList<Object> storage;

   public MyBlockingQueue(int size) {
       this.maxSize = size;
       storage = new LinkedList<>();
   }

   public synchronized void put() throws InterruptedException {
       while (storage.size() == maxSize) {
           wait();
       }
       storage.add(new Object());
       notifyAll();
   }

   public synchronized void take() throws InterruptedException {
       while (storage.size() == 0) {
           wait();
       }
       System.out.println(storage.remove());
       notifyAll();
   }

}

As shown in the code, the most important parts are the take() and put() methods. Let’s first look at the put() method. It is protected by the synchronized keyword. It checks whether the queue is full using a while loop. If it is not full, it adds data to the queue and wakes up other threads with notifyAll(). Similarly, the take() method is also synchronized and checks whether the queue is empty. If it is not empty, it retrieves data and wakes up other threads.

Here is the producer-consumer code implemented using this MyBlockingQueue:

/**
 * Description: Implementation of producer-consumer pattern using wait/notify
 */
public class WaitStyle {
   public static void main(String[] args) {
       MyBlockingQueue myBlockingQueue = new MyBlockingQueue(10);
       Producer producer = new Producer(myBlockingQueue);
       Consumer consumer = new Consumer(myBlockingQueue);
       new Thread(producer).start();
       new Thread(consumer).start();
   }
}

class Producer implements Runnable {
   private MyBlockingQueue storage;

   public Producer(MyBlockingQueue storage) {
       this.storage = storage;
   }

   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.put();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}

class Consumer implements Runnable {
   private MyBlockingQueue storage;

   public Consumer(MyBlockingQueue storage) {
       this.storage = storage;
   }

   @Override
   public void run() {
       for (int i = 0; i < 100; i++) {
           try {
               storage.take();
           } catch (InterruptedException e) {
               e.printStackTrace();
           }
       }
   }
}

The above are explanations of three ways to implement the producer-consumer pattern. Among them, the first method using BlockingQueue is relatively simple, but the implementation principles behind it are reflected in the second and third methods. The second and third methods essentially implement some core logic of BlockingQueue on our own, which can be used by producers and consumers.