29 Producer Consumer Pattern E Commerce Inventory Design Optimization

29 Producer-consumer pattern E-commerce inventory design optimization #

Hello, I’m Liu Chao.

The producer-consumer pattern is something we have used in some previous cases, and I believe you have some understanding of it. This pattern is a classic multi-threaded concurrent collaboration pattern, where producers and consumers solve the strong coupling relationship through an intermediate container, achieving the effect of buffering by allowing different production and consumption speeds.

Using the producer-consumer pattern can improve system performance and throughput. Today, let’s take a look at several implementations of this pattern and its application in e-commerce inventory management.

Implementing the producer-consumer pattern using Object’s wait/notify/notifyAll #

In [Lesson 16], I introduced the use of Object’s wait/notify/notifyAll to implement the producer-consumer pattern. This approach relies on Object’s wait/notify/notifyAll and object monitors to achieve thread synchronization and communication.

Furthermore, in [Lesson 12], I explained in detail how monitors work. Based on that knowledge, we can understand that this implementation of the producer-consumer pattern is based on the kernel and may lead to a significant amount of context switching, resulting in suboptimal performance.

Implementation of producer-consumer pattern using await/signal/signalAll in Condition of Lock #

Compared to the producer-consumer pattern implemented using the wait/notify/notifyAll methods provided by the Object class, I recommend using the producer-consumer pattern implemented using the await/signal/signalAll methods in the Condition interface provided by the java.util.concurrent package.

The Condition interface defines the await/signal/signalAll methods, which are similar in function to the wait/notify/notifyAll methods in Object. This interface works in conjunction with the explicit lock Lock to implement thread blocking and waking operations.

In [Lecture 13], I explained in detail about explicit locks. The explicit locks ReentrantLock or ReentrantReadWriteLock are both implemented based on the AQS (AbstractQueuedSynchronizer), and the ConditionObject internal class in AQS implements the Condition interface.

As we know, AQS has a synchronization queue (CLH queue). When a thread fails to acquire a lock, it is put into the synchronization queue and becomes blocked. If it is awakened and acquires the lock, it is removed from the synchronization queue.

In addition to the synchronization queue, AQS also has a condition queue. By using the addWaiter method, we can put the thread called by the await() method into the condition queue, causing the thread to enter a waiting state. When the signal or signalAll methods are called, the thread will be awakened and removed from the condition queue, then it will enter the synchronization queue. The condition queue is implemented with a singly linked list, so Condition supports multiple waiting queues.

From the above, it can be seen that the producer-consumer pattern implemented using await/signal/signalAll in Condition of Lock is implemented at the Java code level, so it has advantages in terms of performance and scalability.

Let’s take a look at an example. We will use a piece of code to implement the production and consumption of inventory.

public class LockConditionTest {
    private LinkedList<String> product = new LinkedList<String>();

    private int maxInventory = 10; // maximum inventory

    private Lock lock = new ReentrantLock(); // resource lock
    private Condition condition = lock.newCondition(); // conditions for non-full and non-empty inventory

    /**
     * Add product to the inventory
     * @param e
     */
    public void produce(String e) {
        lock.lock();
        try {
            while (product.size() == maxInventory) {
                condition.await();
            }

            product.add(e);
            System.out.println("Added a product to the inventory, total inventory size: " + product.size());
            condition.signalAll();

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            lock.unlock();
        }
    }

    /**
     * Consume product from the inventory
     * @return
     */
    public String consume() {
        String result = null;
        lock.lock();
        try {
            while (product.size() == 0) {
                condition.await();
            }

            result = product.removeLast();
            System.out.println("Consumed a product, total inventory size: " + product.size());
            condition.signalAll();

        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            lock.unlock();
        }

        return result;
    }

    /**
     * Producer
     */
    private class Producer implements Runnable {

        public void run() {
            for (int i = 0; i < 20; i++) {
                produce("Product " + i);
            }
        }

    }

    /**
     * Consumer
     */
    private class Consumer implements Runnable {

        public void run() {
            for (int i = 0; i < 20; i++) {
                consume();
            }
        }
    }

    public static void main(String[] args) {
        LockConditionTest lc = new LockConditionTest();
        new Thread(lc.new Producer()).start();
        new Thread(lc.new Consumer()).start();
        new Thread(lc.new Producer()).start();
        new Thread(lc.new Consumer()).start();
    }
}

After going through the example, please consider if there is any room for optimization.

From the code, we can notice that the producer and consumer are competing for the same lock, but in fact, they are not synchronized with each other. Since Condition supports multiple waiting queues and does not respond to interruptions, we can separate the waiting conditions and lock resources for the producer and consumer, which can further optimize the concurrency performance of the system. The modified code is as follows:

private LinkedList<String> product = new LinkedList<String>();
private AtomicInteger inventory = new AtomicInteger(0); // real-time inventory

private int maxInventory = 10; // maximum inventory
    private Lock consumerLock = new ReentrantLock(); // Resource lock
    private Lock productLock = new ReentrantLock(); // Resource lock

    private Condition notEmptyCondition = consumerLock.newCondition(); // Not empty condition
    private Condition notFullCondition = productLock.newCondition(); // Not full condition

    /**
     * Add a new item to the inventory
     * @param e
     */
    public void produce(String e) {
        productLock.lock();
        try {
            while (inventory.get() == maxInventory) {
                notFullCondition.await();
            }

            product.add(e);
            
            System.out.println("Added an item to the inventory, total inventory: " + inventory.incrementAndGet());
            
            if(inventory.get() < maxInventory) {
                notFullCondition.signalAll();
            }

        } catch (Exception ex) {
            ex.printStackTrace();
        } finally {
            productLock.unlock();
        }
        
        if(inventory.get() > 0) {
            try {
                consumerLock.lockInterruptibly();
                notEmptyCondition.signalAll();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            } finally {
                consumerLock.unlock();
            }
        }
    }

    /**
     * Consume an item from the inventory
     * @return
     */
    public String consume() {
        String result = null;
        consumerLock.lock();
        try {
            while (inventory.get() == 0) {
                notEmptyCondition.await();
            }

            result = product.removeLast();
            System.out.println("Consumed an item, total inventory: " + inventory.decrementAndGet());
            
            if(inventory.get() > 0) {
                notEmptyCondition.signalAll();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            consumerLock.unlock();
        }
        
        if(inventory.get() < maxInventory) {
            
            try {
                productLock.lockInterruptibly();
                notFullCondition.signalAll();
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            } finally {
                productLock.unlock();
            }
        }
        return result;
    }
    
    /**
     * Producer class
     */
    private class Producer implements Runnable {

        public void run() {
            for (int i = 0; i < 20; i++) {
                produce("Item " + i);
            }
        }
    }

    /**
     * Customer class
     */
    private class Customer implements Runnable {

        public void run() {
            for (int i = 0; i < 20; i++) {
                consume();
            }
        }
    }

    public static void main(String[] args) {

        LockConditionTest2 lc = new LockConditionTest2();
        new Thread(lc.new Producer()).start();
        new Thread(lc.new Customer()).start();

    }
}

We create two locks, productLock and consumerLock, to control the parallel operation of producer and consumer threads respectively. We also set two condition variables, notEmptyCondition to control the state of consumer threads, and notFullCondition to control the state of producer threads. With this optimization, we can reduce the competition between consumers and producers and achieve concurrent execution of both.

We use a LinkedList to store and access our inventory. Although LinkedList is not thread-safe, we add items to the head and remove items from the tail, so there shouldn’t be any thread safety issues in theory. The actual quantity of the inventory, inventory, is implemented using AtomicInteger (CAS lock), which ensures atomicity and visibility between consumers and producers.

Implementing Producer-Consumer using BlockingQueue #

Compared to the previous two implementations, the BlockingQueue implementation is the simplest and most straightforward.

Since BlockingQueue is thread-safe, when retrieving or removing elements from the queue, if the queue is empty, the operation will wait until the queue is not empty. Similarly, if you try to add an element to the queue and there is no available space, the addition operation will also wait. Therefore, BlockingQueue is ideal for implementing the Producer-Consumer pattern. Let’s take a look at an example of its optimization, with the code as follows:

public class BlockingQueueTest {
 
    private int maxInventory = 10; // maximum inventory
 
    private BlockingQueue<String> product = new LinkedBlockingQueue<>(maxInventory); // buffer queue
 
    /**
     * Add new inventory
     * @param e
     */
    public void produce(String e) {
        try {
            product.put(e);
            System.out.println("Added one inventory, total inventory: " + product.size());
        } catch (InterruptedException e1) {
            // TODO Auto-generated catch block
            e1.printStackTrace();
        }
    }
 
    /**
     * Consume product
     * @return
     */
    public String consume() {
        String result = null;
        try {
            result = product.take();
            System.out.println("Consumed one product, total inventory: " + product.size());
        } catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
 
        return result;
    }
 
    /**
     * Producer
     */
    private class Producer implements Runnable {
 
        public void run() {
            for (int i = 0; i < 20; i++) {
                produce("Product " + i);
            }
        }
 
    }
 
    /**
     * Consumer
     */
    private class Customer implements Runnable {
 
        public void run() {
            for (int i = 0; i < 20; i++) {
                consume();
            }
        }
    }
 
    public static void main(String[] args) {
 
        BlockingQueueTest bq = new BlockingQueueTest();
        new Thread(bq.new Producer()).start();
        new Thread(bq.new Customer()).start();
        new Thread(bq.new Producer()).start();
        new Thread(bq.new Customer()).start();
 
    }
}

In this example, we create a LinkedBlockingQueue and set the queue size. Then, we create a consume() method, which calls the take() method of LinkedBlockingQueue to allow the consumer to retrieve a product. If the number of products in the queue is zero, the consumer will enter a waiting state. Next, we create a produce() method, which calls the put() method of LinkedBlockingQueue to allow the producer to add a product to the queue. If the queue is full, the producer will enter a waiting state.

Optimizing E-commerce Inventory Design with the Producer-Consumer Pattern #

After understanding several common implementation methods of the Producer-Consumer pattern, let’s take a closer look at how this pattern can optimize the inventory design in an e-commerce system.

In e-commerce systems, there are often rush-buying activities where the inventory of the products being rushed to purchase is actually stored in an inventory table. In order to improve the performance of rush-buying, we usually store the inventory in a cache and achieve precise deduction of inventory through the inventory stored in the cache. After submitting the order and making the payment, we also need to deduct the inventory from the database. However, if we still need to operate the database in the face of instantaneous high concurrency, the database may become a performance bottleneck, especially in the case of a single table and a single database.

If we want to implement sharding of the inventory table, it will inevitably increase the complexity of the business logic. Just imagine the inventory of a product being stored in tables of different databases, how can we determine which database to deduct the inventory from?

If we deduct the inventory from the tables arbitrarily, it will result in some tables being deducted completely while others still have inventory, which is obviously unreasonable. In this case, we need to add additional logic to resolve the issue.

In the case of not sharding the database, in order to improve the performance and throughput of deducting inventory for orders, we can use the Producer-Consumer pattern to optimize the system performance.

Creating an order is equivalent to being a producer, and the queue that holds the orders is the buffer container, while consuming the orders from the queue is the operation of deducting inventory from the database. The queue that holds the orders can greatly buffer the pressure on the database brought by high concurrency.

We can also implement the Producer-Consumer pattern based on message queues. Nowadays, RabbitMQ and RocketMQ have both implemented transactions. We only need to submit the order to the message queue through transactions, and the consumer that deducts the inventory only needs to consume the message queue to gradually operate the database.

Summary #

Using the producer-consumer pattern to buffer the pressure of deducting inventory in high-concurrency databases, there are actually many similar examples.

For example, we often use message queues to smooth out high-concurrency traffic, which is based on the same principle. When purchasing a product, if all purchase requests directly enter the logic business of checking inventory and freezing cached inventory, the resource consumption from these logic business operations may overwhelm the application service. In order to ensure the rationality of system resource usage, we can use a message queue to buffer the transient high-concurrency requests.

In addition to optimizing system performance, the producer-consumer pattern can also be applied to scenarios where some tasks take a long time to execute.

For example, in the report export business, when a user exports a relatively large report, they usually have to wait a long time, which provides a very poor user experience. Usually, we can pre-export certain report content, such as users frequently needing to export yesterday’s sales report today, or exporting last month’s report at the beginning of the month. We can pre-export the report to the local or memory in advance so that users can directly download the report in a very short time.

Thought question #

We can use the producer-consumer pattern to implement momentary high-concurrency flow peak shaving. However, while this approach relieves the pressure on the consumer side, the producer side will experience a large number of thread bloc