37 Principles of Concurrency Safety for Blocking and Non Blocking Queues

37 Principles of Concurrency Safety for Blocking and Non-blocking Queues #

In this lesson, we mainly study the concurrency safety principles of blocking and non-blocking queues.

Previously, we explored the characteristics of common blocking queues. In this lesson, we take ArrayBlockingQueue as an example to analyze the thread safety principles of BlockingQueue, and then we examine its sibling, the concurrency safety principles of non-blocking queues. Through this lesson, we can understand the underlying principles of concurrent queues.

Analysis of ArrayBlockingQueue Source Code #

Let’s first look at the source code of ArrayBlockingQueue. ArrayBlockingQueue has several important attributes:

// Array used for storing elements

final Object[] items;

// The position for the next take operation

int takeIndex;

// The position for the next put operation

int putIndex;

// Number of elements in the queue

int count;

The first attribute is the most core attribute, which is an Object type array used to store elements. It also has two position variables, takeIndex and putIndex, which are used to indicate the positions for the next read and write operations. In addition, there is a count variable used for counting the number of elements in the queue.

In addition, let’s take a look at the following three variables:

// These 3 variables are used for concurrency control

final ReentrantLock lock;

private final Condition notEmpty;

private final Condition notFull;

These three variables are also very critical. The first variable is a ReentrantLock, and the next two Conditions are generated by ReentrantLock. These three variables are the most important tools for implementing thread safety.

The principle of ArrayBlockingQueue implementing concurrent synchronization is to use ReentrantLock and its two Conditions. Both read and write operations need to acquire the exclusive lock of ReentrantLock before proceeding to the next step. During a read operation, if the queue is empty, the thread will enter the notEmpty Condition queue dedicated to read threads and wait for write threads to write new elements. Similarly, if the queue is full, the write operation thread will enter the notFull queue dedicated to write threads and wait for read threads to remove queue elements and free up space.

Next, let’s analyze the most important put method:

public void put(E e) throws InterruptedException {

    checkNotNull(e);

    final ReentrantLock lock = this.lock;

    lock.lockInterruptibly();

    try {

        while (count == items.length)
            notFull.await();

        enqueue(e);

    } finally {

        lock.unlock();

    }

}

In the put method, first use the checkNotNull method to check whether the inserted element is null. If it is not null, we will lock it with ReentrantLock, and the lock method used is lock.lockInterruptibly(). We have discussed this method in lesson 23. It can respond to interrupts while acquiring the lock. This is the underlying reason why our blocking queue can respond to interrupts during the period when it tries to acquire the lock but has not obtained it.

Then, there is a very classic try-finally block. The unlock operation is performed in the finally block, and there is a while loop in the try block. It checks whether the current queue is full, that is, whether the count equals the length of the array. If it is equal, it means that the queue is full, so we will wait until there is space available before performing the next operation, calling the enqueue method to let the element enter the queue, and finally unlock it using the unlock method.

You may find this code familiar. In lesson 5, when we implemented the producer/consumer pattern using Condition, we wrote a put method, as shown below:

public void put(Object o) throws InterruptedException {

    lock.lock();

    try {

        while (queue.size() == max) {
            notFull.await();
        }

        queue.add(o);

Hope this helps! notEmpty.signalAll();

} finally {

    lock.unlock();

}

}

And we can see that these two methods are almost identical. So, as we mentioned in the 5th lesson, when we implemented the producer/consumer pattern using Condition, we actually implemented a simplified version of BlockingQueue. You can compare the implementations of these two put methods to deepen your understanding of Condition.

Similar to ArrayBlockingQueue, other blocking queues such as LinkedBlockingQueue, PriorityBlockingQueue, DelayQueue, DelayedWorkQueue, etc., also use ReentrantLock to ensure thread safety internally. The details may vary, for example, LinkedBlockingQueue has two locks internally, one for the head of the queue and one for the tail. This makes it more efficient than sharing the same lock. However, the overall idea is similar.

Non-blocking queue: ConcurrentLinkedQueue #

After discussing blocking queues, let’s take a look at the non-blocking queue: ConcurrentLinkedQueue. As the name suggests, ConcurrentLinkedQueue uses a linked list as its data structure. Let’s take a look at the source code of the key method offer:

public boolean offer(E e) {

    checkNotNull(e);

    final Node<E> newNode = new Node<E>(e);

    for (Node<E> t = tail, p = t;;) {

        Node<E> q = p.next;

        if (q == null) {

            // p is last node

            if (p.casNext(null, newNode)) {

                // Successful CAS is the linearization point

                // for e to become an element of this queue,

                // and for newNode to become "live".

                if (p != t) // hop two nodes at a time

                    casTail(t, newNode);  // Failure is OK.

                return true;

            }

            // Lost CAS race to another thread; re-read next

        }

        else if (p == q)

            // We have fallen off list.  If tail is unchanged, it

            // will also be off-list, in which case we need to

            // jump to head, from which all live nodes are always

            // reachable.  Else the new tail is a better bet.

            p = (t != (t = tail)) ? t : head;

        else

            // Check for tail updates after two hops.

            p = (p != t && t != (t = tail)) ? t : q;

    }

}

Instead of analyzing the code line by line, let’s focus on the overall code structure. After the null check, we can see that it is a big for loop, and it is a very obvious infinite loop. Inside this loop, there is a prominent p.casNext method, which utilizes CAS. This infinite loop works in conjunction with CAS, making it a typical optimistic lock. Let’s take a look at the implementation of the p.casNext method, which is as follows:

boolean casNext(Node<E> cmp, Node<E> val) {

    return UNSAFE.compareAndSwapObject(this, nextOffset, cmp, val);

}

From the code, we can see that the UNSFE.compareAndSwapObject method is used to perform CAS. The compareAndSwapObject is a native method that ultimately uses the CAS instruction of the CPU to ensure its atomicity.

ConcurrentLinkedQueue, a non-blocking queue, uses the CAS non-blocking algorithm and keeps retrying until successful to achieve thread safety. It is suitable for scenarios that do not require blocking and have moderate concurrency.

Summary #

In conclusion, in this lesson, we analyzed the concurrency safety principles of blocking and non-blocking queues. Blocking queues mainly use ReentrantLock and its Condition to achieve thread safety, while non-blocking queues use CAS for thread safety.