22 What's the Underlying Principle of Atomic Integer and How to Apply Cas Operations in Your Own Product Code

22 What’s the underlying principle of AtomicInteger and how to apply CAS operations in your own product code #

In today’s lecture, I will analyze the components inside the concurrent package and take a look at the principles underlying the design and implementation of various synchronization structures and thread pools.

Today, I want to ask you a question: What is the underlying implementation principle of AtomicInteger? How to apply CAS operation in your own product code?

Typical Answer #

AtomicInteger is a wrapper for the int type that provides atomic access and update operations. Its atomic operations are implemented based on CAS (compare-and-swap) technology.

CAS refers to a series of operations that involve obtaining the current value, performing some calculations, and attempting to update using the CAS instruction. If the current value remains unchanged, it means that no other threads have made concurrent modifications, and therefore the update is successful. Otherwise, it may result in different choices, either by retrying or returning a success or failure result.

From the internal attributes of AtomicInteger, it can be seen that it relies on the underlying capabilities provided by Unsafe for low-level operations, and the value field with the volatile modifier is used to store the value to ensure visibility.

private static final jdk.internal.misc.Unsafe U = jdk.internal.misc.Unsafe.getUnsafe();
private static final long VALUE = U.objectFieldOffset(AtomicInteger.class, "value");
private volatile int value;

For the specific details of atomic operations, you can refer to any atomic update method, such as getAndIncrement.

Unsafe uses the memory address offset of the value field to complete the operation directly.

public final int getAndIncrement() {
    return U.getAndAddInt(this, VALUE, 1);
}

Because getAndIncrement needs to return the value, it needs to add a failed retry logic.

public final int getAndAddInt(Object o, long offset, int delta) {
    int v;
    do {
        v = getIntVolatile(o, offset);
    } while (!weakCompareAndSetInt(o, offset, v, v + delta));
    return v;
}

On the other hand, functions like compareAndSet that return a boolean type do not require retry because their return value represents success or failure.

public final boolean compareAndSet(int expectedValue, int newValue)

CAS is the basis of lock-free mechanisms in Java concurrency.

Analysis of Key Points #

Today’s question is a bit more focused on the underlying mechanisms of Java concurrency. Although we may not necessarily be involved in the implementation level of CAS (Compare and Swap) in development, it is still very necessary to understand its mechanism and how to apply this technology in Java, especially as it is a hot topic in concurrent programming interviews.

Some students have reported that interviewers will ask about how CAS is implemented at a lower level. This depends on the specific instructions provided by the CPU, and there are significant differences depending on the architecture. For example, the x86 CPU provides the cmpxchg instruction, while in reduced instruction set architectures, it is usually implemented by a pair of instructions (such as “load and reserve” and “store conditional”). CAS is a very lightweight operation on most processors, and this is also its advantage.

In most cases, it is sufficient to have this level of understanding. I believe it is not necessary for every Java engineer to understand instructions at the level. We abstract and divide our work in order to allow developers at different levels to shield irrelevant details as much as possible during development.

If I were the interviewer, I would likely delve into these directions:

  • In what scenarios can CAS technology be used, as calling the Unsafe class is not the best choice for most scenarios. Is there a more recommended approach? After all, mastering a technology is not the goal, let alone just for interviews. We still hope to have value in actual products.
  • Understanding the underlying implementation techniques of concurrent structures such as ReentrantLock and CyclicBarrier.

Knowledge Expansion #

When it comes to the usage of CAS (Compare and Swap), let’s consider the following scenario: In a database product, one common choice to ensure the consistency of an index is to allow only one thread to exclusively modify a partition of the index. How can we implement this at the database abstraction level?

One way to achieve this is to add a logical lock to the index partition object. For example, we can use the current thread ID as the lock value and use atomic operations to set and release the lock. The pseudo code for this approach is as follows:

public class AtomicBTreePartition {
    private volatile long lock;
    public void acquireLock(){}
    public void releaseLock(){}
}

Now, how do we implement the lock operations in Java code? Unsafe doesn’t seem to be a good choice anymore. For example, I noticed that products like Cassandra faced issues with the removal of Unsafe.moniterEnter()/moniterExit() in Java 9, which prevented them from smoothly upgrading to new JDK versions. Currently, Java provides two commonly used public APIs to achieve CAS operations. One of them is java.util.concurrent.atomic.AtomicLongFieldUpdater, which is based on reflection. We need to ensure that the type and field names are correct.

private static final AtomicLongFieldUpdater<AtomicBTreePartition> lockFieldUpdater =
        AtomicLongFieldUpdater.newUpdater(AtomicBTreePartition.class, "lock");

private void acquireLock(){
    long t = Thread.currentThread().getId();
    while (!lockFieldUpdater.compareAndSet(this, 0L, t)){
        // Wait for a while, as database operations may be slow
        ...
    }
}

The Atomic package provides the most commonly used atomic data types and even atomic types and update operation utilities for references, arrays, and other related types. It is the preferred choice for many thread-safe programs.

In my previous column, I introduced the use of atomic data types and Atomic*FieldUpdater to create more compact counter implementations as an alternative to AtomicLong. Optimization is always tailored to specific requirements and goals. In my case, the focus is on introducing possible approaches, but the details depend on the specific needs. If only one or two objects are being created, there is no need for the optimization mentioned earlier. However, if there are thousands or more objects, the impact of compactness should be considered. Moreover, in highly competitive environments, LongAdder provided by the atomic package may be a better choice than AtomicLong, although its essence is a trade-off between space and time.

Returning to the topic, if you are using Java 9 or later, you can completely implement this functionality in a different way using the Variable Handle API, which originated from JEP 193. This API provides atomic and ordered operations at various granularities. The modified code for the previous example is as follows:

private static final VarHandle HANDLE = MethodHandles.lookup().findStaticVarHandle
        (AtomicBTreePartition.class, "lock");

private void acquireLock(){
    long t = Thread.currentThread().getId();
    while (!HANDLE.compareAndSet(this, 0L, t)){
        // Wait for a while, as database operations may be slow
        ...
    }
}
}

The process is very intuitive. First, obtain the corresponding variable handle and then directly call its CAS method.

Generally speaking, for similar CAS operations, we can and recommend using the Variable Handle API to implement them. This API provides fine-grained common low-level APIs. I emphasize “common” because unlike internal APIs, its API does not undergo unpredictable modifications. This provides a foundation for future product maintenance and upgrades. To be frank, much of the additional workload comes from solving problems using Hack rather than Solution.

CAS is not without side effects. Consider its commonly used failure retry mechanism, which implies the assumption that competition situations are short-lived. In most application scenarios, it is true that most retries are successful after only one attempt. However, unexpected situations always exist. Therefore, when necessary, it is still necessary to consider limiting the number of spin attempts to avoid excessive CPU consumption.

Another issue is the well-known ABA problem, which is usually exposed only in lock-free algorithms. As I mentioned earlier, CAS compares the previous value during updating. If the other party happens to be the same, for example, there was an A -> B -> A update during this period, and only the value A is checked, it may result in unreasonable modification operations. To address this issue, Java provides the AtomicStampedReference utility class, which ensures the correctness of CAS by establishing a version number (stamp) similar to a reference. For specific usage, please refer to the description here.

I have introduced the scenarios and implementation of CAS. Fortunately, in most cases, Java developers do not need to directly use CAS code to implement thread-safe containers. Instead, they can indirectly enjoy the benefits of lock-free mechanisms in terms of scalability through the concurrent package and others.

Now let me introduce AbstractQueuedSynchronizer (AQS), which serves as the foundation for implementing various synchronization structures and some other components (such as Workers in thread pools) in the Java concurrent package.

When learning about AQS, if you start by looking at its series of methods (as shown in the figure below), you will likely confuse yourself. Being in a state of seeming understanding without truly understanding it does not have much practical significance.

AQS methods

My suggestion is to simplify it as much as possible and understand why AQS is needed, how to use AQS, at least what needs to be done, and then further combine it with practical examples in the JDK source code to understand the principles and applications of AQS.

Doug Lea once introduced the original intention of AQS’s design. In principle, a synchronization structure can often be implemented using other structures. For example, in column 19 of this column series, I mentioned that a mutex lock can be implemented using a Semaphore. However, a preference for a certain synchronization structure will result in complex and obscure implementation logic. Therefore, Doug Lea chose to abstract the basic synchronization operations into AbstractQueuedSynchronizer and use AQS to provide us with templates for building synchronization structures.

The internal data and methods of AQS can be simply divided into:

  • A volatile integer member representing the state, which provides setState and getState methods.
private volatile int state;
  • A first-in-first-out (FIFO) waiting thread queue to implement thread competition and waiting. This is one of the cores of the AQS mechanism.
  • Various CAS-based basic operation methods, as well as various acquire/release methods expected to be implemented by specific synchronization structures.

To implement a synchronization structure using AQS, at least two basic types of methods need to be implemented. The first is the acquire operation, which obtains the exclusive rights to resources; the second is the release operation, which releases the exclusive rights to a certain resource.

Taking ReentrantLock as an example, it internally implements the Sync type by extending AQS, using the state of AQS to reflect the lock hold situation.

private final Sync sync;
abstract static class Sync extends AbstractQueuedSynchronizer { }

Below is the corresponding acquire and release operations for ReentrantLock. If it is CountDownLatch, you can consider it as await()/countDown(), and the implementation details are also different.

public void lock() {
    sync.acquire(1);
}

public void unlock() {
    sync.release(1);
}

Excluding some details, let’s analyze the logic of the acquire method. Its implementation internally calls tryAcquire and acquireQueued, which are two basic parts that need to be understood.

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

First, let’s take a look at tryAcquire. In ReentrantLock, the logic of tryAcquire is implemented in NonfairSync and FairSync, which provide further methods for non-fairness or fairness. The tryAcquire method inside AQS is just a method that is almost not implemented (it simply throws an exception), this is left for the implementer to define the operation.

We can see how fairness is specified in ReentrantLock during construction, as shown below:

public ReentrantLock() {
        sync = new NonfairSync(); // Default is non-fair
    }
    
public ReentrantLock(boolean fair) {
    sync = fair ? new FairSync() : new NonfairSync();
}

Taking non-fair tryAcquire as an example, its internal implementation shows how to obtain the lock by coordinating the state with CAS. Note that in comparison to the fair version of tryAcquire, it does not check if there are other waiters when the lock is unoccupied, which reflects the semantics of non-fairness.

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) { // 0 means unoccupied, directly modify the state bit using CAS
        if (compareAndSetState(0, acquires)) { // Do not check queue situation, directly compete
            setExclusiveOwnerThread(current); // Set the current thread as the exclusive owner of the lock
            return true;
        }
    } else if (current == getExclusiveOwnerThread()) { // Even if the state is not 0, the current thread may still be the lock holder because this is a reentrant lock
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

Next, let’s analyze acquireQueued. If the previous tryAcquire fails, it means that the lock contention failed and enters the queuing competition stage. This is the part where thread-to-thread competition for locks is implemented using the FIFO queue, which can be considered as the core logic of AQS.

The current thread is wrapped into an exclusive mode node (EXCLUSIVE) and added to the queue through the addWaiter method. The logic of acquireQueued, in brief, is that if the node in front of the current node is the head node, it tries to acquire the lock. If everything goes smoothly, it becomes the new head node; otherwise, if necessary, it waits. Please refer to the comments I added for the specific processing logic.

final boolean acquireQueued(final Node node, int arg) {
    boolean interrupted = false;
    try {
        for (;;) { // Loop
            final Node p = node.predecessor(); // Get the previous node
            if (p == head && tryAcquire(arg)) { // If the previous node is the head node, it means that the current node is suitable for tryAcquire
                setHead(node); // If acquire is successful, set the new head node
                p.next = null; // Clear the reference from the previous node to the current node
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node)) { // Check if parking is necessary after failure
                interrupted |= parkAndCheckInterrupt();
            }
        }
    } catch (Throwable t) {
        cancelAcquire(node); // Cancel if an exception occurred
        if (interrupted)
            selfInterrupt();
        throw t;
    }
}

So far, the process of threads trying to acquire locks has been presented. tryAcquire is the part that needs to be implemented by the developer according to specific scenarios, while the thread-to-thread competition is implemented by AQS through the Waiter queue and acquireQueued in the release method, corresponding operations are also performed on the queue.

Today, I introduced the underlying technology CAS of the Atomic data type and demonstrated how to use CAS in product code through an example. Finally, I introduced the basic technology AQS of the concurrent package. I hope this helps you.

Practice Exercise #

Have you understood the topic we discussed today? Today, I’m assigning a source code reading assignment. What is the purpose of the waitStatus field in the Node class of AQS?

Please write your thoughts on this question in the comments section. I will select the comments that show careful thinking and reward you with a study reward coupon. Feel free to discuss with me.

Are your friends also preparing for interviews? You can “invite friends to read” and share today’s question with them. Perhaps you can help them.