22 Technique Series, High Performance Lock Free Queue Mpsc Queue

22 Technique series, high-performance lock-free queue Mpsc Queue #

In the previous source code course, the Mpsc Queue appeared in the task queues of the NioEventLoop thread and the HashedWheelTimer. What “black technology” is this used by Netty? Why not use the native JDK queue? When should Mpsc Queue be used? Let’s learn more about it today!

Native JDK concurrent queues #

Before introducing Mpsc Queue, let’s review the working principles of native JDK queues. JDK concurrent queues can be divided into two types: blocking queues and non-blocking queues. Blocking queues are implemented based on locks, while non-blocking queues are implemented based on CAS operations. JDK provides multiple implementations of blocking and non-blocking queues, as shown in the following figure.

Image 1.png

A queue is a FIFO (first-in, first-out) data structure. In JDK, the java.util.Queue interface is defined for queues, similar to the List and Set interfaces. The java.util.Queue also extends the Collection interface. In addition, JDK provides a deque interface java.util.Deque, and the commonly used LinkedList implements the Deque interface. Below, we briefly introduce the characteristics of each queue in the above figure, and provide some comparisons and summaries.

Blocking queues #

Blocking queues block when they are empty or full. Blocking queues are thread-safe, so users don’t need to worry about thread safety, which reduces the difficulty of multi-threaded development. The main types of blocking queues are:

  • ArrayBlockingQueue: This is the most basic and commonly used blocking queue in development. It is a bounded queue implemented using an array. The queue capacity needs to be specified during initialization. How does ArrayBlockingQueue ensure thread safety? It uses a reentrant lock ReentrantLock internally, and uses two condition variables notEmpty and notFull from the Condition interface to control concurrent access. When reading data from the queue, if the queue is empty, it will block until there is data in the queue. If the queue is full, it will also block until there is space available in the queue.

  • LinkedBlockingQueue: This uses a linked list as the underlying data structure. The length of the queue can be bounded or unbounded. The queue length is not required to be specified during initialization, and the default length is Integer.MAX_VALUE. LinkedBlockingQueue internally uses two reentrant locks takeLock and putLock, as well as two condition variables notEmpty and notFull, to control concurrent access. The advantage of using read and write locks is that it avoids the phenomenon of competing locks during read and write operations. Therefore, compared to ArrayBlockingQueue, LinkedBlockingQueue has better performance.

  • PriorityBlockingQueue: This is a priority queue implemented using a min-heap. The elements in the queue are sorted by priority, and each dequeued element is the one with the highest priority. Internally, PriorityBlockingQueue uses a ReentrantLock and a condition variable notEmpty from the Condition interface to control concurrent access. The notFull condition variable is not needed because PriorityBlockingQueue is an unbounded queue, so there will be no blocking when putting elements into the queue. PriorityBlockingQueue uses an array to implement the underlying min-heap. When the number of elements is greater than or equal to the maximum capacity, it triggers resizing. During resizing, the lock is released first to ensure that other elements can be dequeued normally, and then CAS operations are used to ensure that only one thread can execute the resizing logic.

  • DelayQueue: This is a blocking queue that supports delayed element retrieval. It is commonly used in caching, scheduling of timed tasks, and other scenarios. DelayQueue stores objects using a priority queue PriorityQueue. Each object in DelayQueue must implement the Delayed interface and override the compareTo and getDelay methods. When putting elements into the queue, the delay time must be specified, and only elements with the expired delay time can be taken from the queue.

  • SynchronousQueue: This is also called an unbuffered queue. What’s special about it is that SynchronousQueue does not store elements internally. Unlike ArrayBlockingQueue and LinkedBlockingQueue, SynchronousQueue directly uses CAS operations to control thread-safe access. Both the put and take operations are blocking. Each put operation must block and wait for a take operation, and vice versa. Therefore, SynchronousQueue can be understood as a scenario that requires producers and consumers to pair up and wait for each other until a successful pairing is achieved. SynchronousQueue is used in the JDK’s thread pool Executors.newCachedThreadPool. For newly submitted tasks, if there are idle threads, they will be reused to process the tasks; otherwise, new threads will be created to handle the tasks.

  • LinkedTransferQueue: This is a special type of unbounded blocking queue. It can be seen as a combination of LinkedBlockingQueue, SynchronousQueue (fair mode), and ConcurrentLinkedQueue. Unlike SynchronousQueue, LinkedTransferQueue can store actual data internally. When performing a put operation, if there is a waiting thread, the data will be directly handed over to the thread; otherwise, it will be put into the queue. Compared to LinkedBlockingQueue, LinkedTransferQueue further improves performance by using CAS lock-free operations.

Non-blocking queues #

After discussing the blocking queue, let’s take a look at the non-blocking queue. Non-blocking queues do not need to block threads by using locks, which improves concurrency performance. The commonly used non-blocking queues in the JDK are as follows:

  • ConcurrentLinkedQueue, which is an unbounded concurrent non-blocking queue implemented using a doubly-linked list. It is a thread-safe version of LinkedQueue. ConcurrentLinkedQueue ensures thread safety through CAS operations, which is the foundation of non-blocking queue implementation. Compared to ArrayBlockingQueue and LinkedBlockingQueue, it has higher performance.
  • ConcurrentLinkedDeque, which is also an unbounded concurrent non-blocking queue implemented using a doubly-linked list. Unlike ConcurrentLinkedQueue, ConcurrentLinkedDeque is a double-ended queue that supports both FIFO and FILO modes. It can insert and remove data from both the head and tail of the queue, making it suitable for scenarios with multiple producers and consumers.

So far, we have finished introducing the common types of queues. The most frequently used queue in our daily development is the BlockingQueue. What basic functions are required to implement a blocking queue? Let’s take a look at the BlockingQueue interface in the following figure.

图片2.png

We can classify the specific behaviors of the BlockingQueue interface into the following table.

图片3.png

The concurrent queues provided by the JDK can meet most of our needs. However, in high-concurrency systems with large-scale traffic, if you have strict performance requirements, the non-blocking concurrent queues provided by the JDK have few options and their performance is not outstanding. If you still need a lock-free safe queue implemented with an array and CAS operations, are there mature solutions available? Java’s powerful ecosystem always brings us surprises, and some third-party frameworks provide high-performance lock-free queues that can meet our needs. Among them, Disruptor and JCTools are very famous.

Disruptor is a high-performance lock-free queue developed by LMAX. We often refer to it as RingBuffer. Its design purpose is to solve the latency problem of memory queues. Disruptor internally uses a circular array and CAS operations, which results in excellent performance. Why is Disruptor’s performance better than the native lock-free queue in the JDK? The circular array can reuse memory, reducing the performance loss caused by memory allocation and release. Moreover, the array can be set to a length of a power of 2, which speeds up the positioning of array indices through bitwise operations. In addition, Disruptor also solves the false sharing problem and is more friendly to CPU cache. Disruptor has been open-source and can be found on GitHub at https://github.com/LMAX-Exchange/disruptor.

JCTools is also an open-source project, and its GitHub address is https://github.com/JCTools/JCTools. JCTools provides tools for JVM concurrent development and mainly provides some concurrent data structures that are missing in the JDK, such as non-blocking Map and non-blocking Queue. Among them, non-blocking queues can be classified into four types for different scenarios:

  • Spsc (Single Producer Single Consumer)
  • Mpsc (Multiple Producers Single Consumer)
  • Spmc (Single Producer Multiple Consumers)
  • Mpmc (Multiple Producers Multiple Consumers) Netty directly introduced JCTools’ Mpsc Queue, so what are the advantages of Mpsc Queue compared to the native concurrent queue in JDK? Let’s start by discussing it.

Understanding Mpsc Queue #

Mpsc stands for Multi Producer Single Consumer. Mpsc Queue ensures that multiple producers can access the queue safely at the same time, while only allowing one consumer to read data from the queue at any given time. In the Netty Reactor thread, the task queue, called taskQueue, must allow multiple producers to submit tasks simultaneously. Therefore, the Mpsc Queue provided by JCTools is very suitable for the Netty Reactor thread model.

Mpsc Queue has multiple implementation classes, such as MpscArrayQueue, MpscUnboundedArrayQueue, MpscChunkedArrayQueue, etc. Let’s start with the basic MpscArrayQueue and later learn about other types of queues, which will be more productive.

First, let’s look at the inheritance hierarchy of MpscArrayQueue, which can be quite complex, as shown in the following diagram.

图片4.png

In addition to the top-level native JDK classes, AbstractCollection and AbstractQueue, MpscArrayQueue also inherits many classes like MpscXxxPad and MpscXxxField. One interesting pattern can be observed, where every class that contains attributes is separated by a class with the suffix MpscXxxPad. What is the purpose of MpscXxxPad? Let’s start from the top and merge all the fields of the classes together to see the overall structure of MpscArrayQueue.

// ConcurrentCircularArrayQueueL0Pad

long p01, p02, p03, p04, p05, p06, p07;

long p10, p11, p12, p13, p14, p15, p16, p17;

// ConcurrentCircularArrayQueue

protected final long mask;

protected final E[] buffer;

// MpmcArrayQueueL1Pad

long p00, p01, p02, p03, p04, p05, p06, p07;

long p10, p11, p12, p13, p14, p15, p16;

// MpmcArrayQueueProducerIndexField

private volatile long producerIndex;

// MpscArrayQueueMidPad

long p01, p02, p03, p04, p05, p06, p07;

long p10, p11, p12, p13, p14, p15, p16, p17;

// MpscArrayQueueProducerLimitField

private volatile long producerLimit;

// MpscArrayQueueL2Pad

long p00, p01, p02, p03, p04, p05, p06, p07;

long p10, p11, p12, p13, p14, p15, p16;

Note: The above code snippet is the complete structure of MpscArrayQueue obtained by merging all the fields of the classes from top to bottom. // MpscArrayQueueConsumerIndexField

protected long consumerIndex;

// MpscArrayQueueL3Pad

long p01, p02, p03, p04, p05, p06, p07;

long p10, p11, p12, p13, p14, p15, p16, p17;

As you can see, the MpscXxxPad class uses a large number of variables of type long, which have no special significance in their naming and only serve as padding. If you have also read the source code of Disruptor, you will find that Disruptor also uses similar padding methods. The reason why Mpsc Queue and Disruptor fill these meaningless variables is to solve the problem of false sharing.

So what is false sharing? It is necessary for us to supplement the basic knowledge in this aspect. In computer architecture, the speed of the CPU is several orders of magnitude higher than that of the memory. In order for the CPU to interact more efficiently with the memory, multiple layers of cache mechanisms are designed between the CPU and the memory, as shown in the following figure.

Drawing 3.png

Generally, the CPU is divided into three levels of cache, namely L1 cache, L2 cache, and L3 cache. The closer the cache is to the CPU, the faster the speed, but the smaller the capacity. Therefore, in terms of performance, L1 > L2 > L3, and in terms of capacity, L1 < L2 < L3. When the CPU reads data, it first looks for it in the L1 cache. If it fails to hit, it continues to look for it in the L2 cache. If it still fails to hit, it continues to look for it in the L3 cache. Finally, if it still fails to hit, it can only look for it in the memory, and after the data is read, it is gradually put into the cache. In addition, when multiple threads share the same data, one of the threads needs to write the data back to the main memory, and the other threads can access the main memory data.

From this, it can be seen that the introduction of multiple-level caches is to maximize the utilization of the CPU. If you are doing frequent CPU calculations, you need to keep the data in the cache as much as possible when the CPU loads data from memory. How does it improve the utilization of the cache? This involves the concept of cache line. Cache line is the smallest unit of CPU cache operation, and the CPU cache consists of several cache lines. The size of the cache line depends on the CPU architecture. In the current mainstream 64-bit architecture, the size of the cache line is usually 64 bytes. In Java, a long type is 8 bytes, so a cache line can store 8 long type variables. When the CPU loads data from memory, it will read the adjacent data into the cache line, because the likelihood of accessing the adjacent data in the future is the highest, which avoids frequent interaction between the CPU and the memory.

How does false sharing occur? What impact does it have? We use the following figure to explain it.

Drawing 4.pn

Assume that variables A, B, C, and D are loaded into the same cache line, and they are frequently modified. When thread 1 modifies variable A in CPU Core1, after the modification is completed, CPU Core1 will notify other CPU Cores that the cache line has become invalid. Then when thread 2 modifies variable C in CPU Core2, it finds that the cache line has become invalid. At this time, CPU Core1 will write the data back to the memory, and CPU Core2 will read the data from the memory into the current cache line.

From this, it can be seen that the more threads that modify the same cache line, the more intense the write competition will be, and the data will be frequently written to the memory, resulting in performance waste. By the way, how is the content of the cache line of each core guaranteed to be consistent in a multicore processor? Interested students can further study the cache coherence protocol MESI. For more details, please refer to https://en.wikipedia.org/wiki/MESI_protocol.

How can we solve the false sharing problem? Disruptor and Mpsc Queue both adopt a space-for-time strategy, which allows objects shared by different threads to be loaded into different cache lines. Let’s illustrate this with a simple example.

public class FalseSharingPadding {

    protected long p1, p2, p3, p4, p5, p6, p7;

    protected volatile long value = 0L;

    protected long p9, p10, p11, p12, p13, p14, p15;

}

From the above code, it can be seen that 7 long type variables are padded before and after the value variable. This ensures that no matter what the situation is, when multiple threads access the value variable, the value variable and other unrelated variables are in different cache lines, as shown in the following figure.

图片7.png

The problem of false sharing is generally very subtle. In the process of actual development, not all parts of a project need to spend a lot of effort to optimize false sharing. Cache padding itself is also precious. We should focus our energy on designing high-performance data structures and make the best use of resources to maximize system performance.

So far, we know that Mpsc Queue uses a large number of long type variables for padding to solve the problem of false sharing, which makes the source code difficult to read. Because variable padding is just to improve the performance of Mpsc Queue and has nothing to do with the main functionality of Mpsc Queue, we will ignore the padding variables and start analyzing the basic implementation principles of Mpsc Queue.

Mpsc Queue Source Code Analysis #

Before we start studying the source code, let’s see how MpscArrayQueue is used, here is an example code:

```java
public class MpscArrayQueueTest {

    public static final MpscArrayQueue<String> MPSC_ARRAY_QUEUE = new MpscArrayQueue<>(2);

    public static void main(String[] args) {

        for (int i = 1; i <= 2; i++) {

            int index = i;

            new Thread(() -> MPSC_ARRAY_QUEUE.offer("data" + index), "thread" + index).start();

        }

        try {

            Thread.sleep(1000L);

            MPSC_ARRAY_QUEUE.add("data3"); // Throws an exception if the queue is full

        } catch (Exception e) {

            e.printStackTrace();

        }

        System.out.println("Queue size: " + MPSC_ARRAY_QUEUE.size() + ", Queue capacity: " + MPSC_ARRAY_QUEUE.capacity());

        System.out.println("Dequeued: " + MPSC_ARRAY_QUEUE.remove()); // Throws an exception if the queue is empty

        System.out.println("Dequeued: " + MPSC_ARRAY_QUEUE.poll()); // Returns null if the queue is empty

    }

}

The program output is as follows:

java.lang.IllegalStateException: Queue full
	at java.util.AbstractQueue.add(AbstractQueue.java:98)
	at MpscArrayQueueTest.main(MpscArrayQueueTest.java:17)
Queue size: 2, Queue capacity: 2
Dequeued: data1
Dequeued: data2
Disconnected from the target VM, address: '127.0.0.1:58005', transport: 'socket'
## MpscArrayQueue and its basic operations

MpscArrayQueue is ultimately a queue. Its basic usage is similar to ArrayBlockingQueue and it relies on two fundamental operations of a queue: **enqueue (offer())** and **dequeue (poll())**. Below, we will provide a detailed explanation of these two important operations.

### Enqueue (offer)

First, let's review the important attributes of MpscArrayQueue:

// ConcurrentCircularArrayQueue

protected final long mask; // mask used for calculating array index

protected final E[] buffer; // array that holds queue data

// MpmcArrayQueueProducerIndexField

private volatile long producerIndex; // producer’s index

// MpscArrayQueueProducerLimitField

private volatile long producerLimit; // maximum value of producer index

// MpscArrayQueueConsumerIndexField

protected long consumerIndex; // consumer’s index


When you see the `mask` variable, you might immediately think that the capacity of the array in the queue must be a power of 2. Since MpscArrayQueue is a multiple producer single consumer queue, both `producerIndex` and `producerLimit` are marked as `volatile`, indicating that modifications by one producer thread need to be visible to other producer threads. How are these attributes used during enqueue and dequeue operations? What roles do the producer and consumer index variables play? Let's proceed with reading the source code with these questions in mind.

First, let's go through the source code of the `offer()` method:

```java
public boolean offer(E e) {

    if (null == e) {

        throw new NullPointerException();

    } else {

        long mask = this.mask;

        long producerLimit = this.lvProducerLimit(); // get the maximum producer index

        long pIndex;

        long offset;

        do {

            pIndex = this.lvProducerIndex(); // get the producer index

            if (pIndex >= producerLimit) {

                offset = this.lvConsumerIndex(); // get the consumer index

                producerLimit = offset + mask + 1L;

                if (pIndex >= producerLimit) {

                    return false; // queue is full

                }

                this.soProducerLimit(producerLimit); // update producerLimit

            }

        } while(!this.casProducerIndex(pIndex, pIndex + 1L)); // CAS update the producer index, exit if the update is successful, indicating that the current producer has occupied the index value

        offset = calcElementOffset(pIndex, mask); // calculate the index of the producer in the array

        UnsafeRefArrayAccess.soElement(this.buffer, offset, e); // put data into the array

        return true;

    }

}

Although the offer() method of MpscArrayQueue is relatively short, it requires some low-level knowledge to understand. But don’t worry, we’ll break it down step by step. First, let’s understand the relationship between producerIndex, producerLimit, and consumerIndex, which is one of the unique aspects of MpscArrayQueue. First, let’s look at the source code of the lvProducerLimit() method:

public MpscArrayQueueProducerLimitField(int capacity) {

    super(capacity);

    this.producerLimit = capacity;

}

protected final long lvProducerLimit() {

    return producerLimit;

}

In the initial state, producerLimit is equal to the capacity of the queue. In relation to the code example in MpscArrayQueueTest, producerLimit = capacity = 2, while producerIndex = consumerIndex = 0. Thread1 and Thread2 then concurrently store data in MpscArrayQueue, as shown in the following diagram.

image.png

At this point, both threads have producerIndex of 0, which is less than producerLimit. Both threads will attempt to update producerIndex using CAS operation, where one thread is guaranteed to succeed and the other will fail. Assuming that Thread1 successfully performs the CAS operation, then Thread2 will update producerIndex again after its own attempt failed. After Thread1 updates producerIndex to 1, because producerIndex is marked as volatile, the updated value will immediately become visible to Thread2. It is worth noting that the value updated by the current thread is used by other threads. When Thread1 and Thread2 both successfully occupy the index value through CAS, they obtain pIndex values of 0 and 1 respectively. The next step is to perform bitwise operation on pIndex to calculate the corresponding index in the array, and then use the UNSAFE.putOrderedObject() method to write the data into the array. The source code is as follows:

public E poll() {

    long cIndex = this.lpConsumerIndex(); // Return the consumer index directly

    long offset = this.calcElementOffset(cIndex); // Calculate the offset of the array
    
    E[] buffer = this.buffer;
    
    E e = UnsafeRefArrayAccess.lvElement(buffer, offset); // Retrieve the element from the array at the given offset

    if (null == e) {
        if (cIndex == this.lvProducerIndex()) { // The queue is full
            return null;
        }

        do {

            e = UnsafeRefArrayAccess.lvElement(buffer, offset); 

        } while(e == null); // Wait for the producer to fill the element

    }

    UnsafeRefArrayAccess.spElement(buffer, offset, (Object)null); // Set the current position to NULL after successful consumption

    this.soConsumerIndex(cIndex + 1L); // Update the consumerIndex to the next position
return e;
}

}

Because there is only one consumer thread, there is no CAS operation in the entire poll() process. The core idea of the poll() method is to obtain the consumer index consumerIndex, then calculate the corresponding offset of the array based on consumerIndex, and then retrieve and return the element at the corresponding position of the array, finally move the consumerIndex to the next position of the circular array.

The logic for obtaining the consumer index and calculating the offset corresponding to the array is similar to offer(), so it will not be repeated here. Let's directly look at how to retrieve the element corresponding to the offset in the array by following the source code of lvElement() method.

```java
public static <E> E lvElement(E[] buffer, long offset) {

    return (E) UNSAFE.getObjectVolatile(buffer, offset);

}

When retrieving the array element, the UNSAFE series of methods is also used, and the getObjectVolatile() method uses LoadLoad Barrier. For the Load1, LoadLoad, Load2 operation sequence, before Load2 and subsequent reading operations, it ensures that the reading operation of Load1 is completed, so the getObjectVolatile() method can ensure that each reading operation can get the latest value from memory.

Unlike offer(), poll() is more concerned about the situation when the queue is empty. When the element obtained by calling the lvElement() method is NULL, there are two possible situations: the queue is empty or the elements filled by the producer are not visible to the consumer yet. If the consumer index consumerIndex is equal to the producer index producerIndex, it means the queue is empty. As long as the two indices are not equal, the consumer needs to wait for the producer to finish filling the data.

After successfully consuming the element in the array, the current consumer index consumerIndex needs to be set to NULL, and then the consumerIndex is moved to the next position of the array. The logic is relatively simple, let’s look at the spElement() and soConsumerIndex() methods together.

public static <E> void spElement(E[] buffer, long offset, E e) {

    UNSAFE.putObject(buffer, offset, e);

}

protected void soConsumerIndex(long newValue) {

    UNSAFE.putOrderedLong(this, C_INDEX_OFFSET, newValue);

}

In the final update operation, we see the application of UNSAFE put series methods again. Among them, putObject() does not use any memory barriers and directly updates the value of the corresponding offset of the object. putOrderedLong is the same as putOrderedObject(), both using StoreStore Barrier and the LazySet mechanism. We won’t elaborate on it anymore.

So far, the core source code of MpscArrayQueue for enqueueing and dequeueing has been analyzed. JCTools is a concurrent tool class for JVM, which contains many black technology tricks, such as padding method to solve false sharing problem, direct memory operation using Unsafe, etc., enabling us to have a further understanding of underlying knowledge. In addition, JCTools also provides other featured queues such as MpscUnboundedArrayQueue and MpscChunkedArrayQueue. If you are interested, you can study them on your own after class. I believe that with the foundation of this class, analyzing other queues will not be difficult for you.

Summary #

MpscArrayQueue is just the tip of the iceberg in Jctools, it contains rich technical details. Let’s summarize the knowledge points of MpscArrayQueue.

  • Solve false sharing problem by padding a large number of long type variables.
  • The capacity of the circular array is set to 2’s power, which allows quick positioning to the corresponding index of the array through bitwise operation.
  • The clever design of producerLimit in the enqueueing offer() operation greatly reduces the number of times to actively obtain the consumer index consumerIndex, resulting in significant performance improvement.
  • Both the enqueueing and dequeueing operations use a large number of UNSAFE series methods. Different UNSAFE methods are used depending on the producer and consumer scenarios. Jctools also focuses on specific usage of low-level operations to maximize performance.

With this, our course on source code analysis comes to an end. There are still many black technologies waiting for us to explore in Netty. I hope that our study of Netty’s core source code in the previous sessions will help in the future study of Netty.