26 Java Concurrency Programming Practices

26 Java Concurrency Programming Practices #

RocketMQ is an excellent distributed system with many programming techniques that are worth learning from. In this article, we will select a few examples from the perspective of concurrent programming in RocketMQ to share with you.

Use of Read-Write Lock #

In RocketMQ, the routing information of a Topic mainly refers to the queue information of a Topic on each Broker. The metadata of the Broker includes the cluster name and the Broker IP address. The write operation of the routing information is performed by the Broker reporting the routing information to the Nameserver every 30 seconds. The read operation of the routing information is done by the message clients (message senders and message consumers) querying the routing message of the Topic from the Nameserver regularly. Moreover, the requests to the Broker are volume requests, while the client queries for routing information are per Topic and there may be thousands of consumer clusters in an application. One of the characteristics is that the number of query requests far exceeds the number of write requests.

1

In the RocketMQ Nameserver, the metadata used to store routing information is implemented using the three HashMaps mentioned above. It is well-known that HashMap is not thread-safe in a multi-threaded environment and can easily cause CPU usage to go up to 100%. Therefore, when the Broker reports the routing information to the Nameserver, it needs to update the three HashMaps mentioned above. Therefore, we need to introduce locks. Considering the characteristics of read-mostly and write-rarely, we use the ReadWriteLock from the JDK to protect the read and write operations on the data. The example code is as follows:

2

A write lock is acquired when adding data to the HashMaps mentioned above.

3

A read lock is acquired when reading data from the HashMaps.

The main feature of a read-write lock is that after acquiring a write lock, all read lock requests are blocked. If a read lock is successfully acquired, the write lock will be blocked. This ensures the concurrency of read requests, and because there are fewer write requests, the waiting caused by the lock will be very minimal. In the context of routing registration, if a Broker sends a heartbeat packet to the Nameserver while there are 100 clients querying routing information from the Nameserver, the write requests will be temporarily blocked. Only after these 100 read requests are completed, the routing update operation will be executed. You may wonder, if during the period when the write request is blocked, another 10 new clients initiate routing queries, will these 10 requests be executed immediately or need to be blocked? The answer is that they will be blocked by default because a write lock is already requesting. Subsequent read requests will be blocked.

Thought question: Why doesn’t the Nameserver use ConcurrentHashMap or other concurrent containers?

One very important aspect is related to its “business”. Because RocketMQ has a lot of routing information, its data structure uses multiple HashMaps, as shown in the figure below:

4

Each write operation may need to modify the above data structure at the same time. In order to ensure its consistency, locks are needed. The thread safety of ConcurrentHashMap as a concurrent container is only for itself in a multi-threaded environment. Therefore, from this perspective, using ReadWriteLock is the inevitable choice.

Of course, readers may ask, if we only consider ReadWriteLock + HashMap vs ConcurrentHashMap, how should we choose? This depends on the JDK version.

Before JDK 8, ConcurrentHashMap had a data structure of Segment (ReentrantLock) + HashMap. The granularity of the lock was at the Segment level. Both read and write operations falling within the same Segment required a lock, which means the read concurrency within the same Segment was lower than that of ReadWriteLock + HashMap. Therefore, before JDK 1.8, ConcurrentHashMap was inferior to ReadWriteLock + HashMap in terms of structure.

However, in JDK 1.8 and subsequent versions, ConcurrentHashMap has been optimized. Its storage structure is similar to that of HashMap, but CAS (compare-and-swap) is introduced to handle concurrent updates. In this respect, I think ConcurrentHashMap has certain advantages because it does not need to maintain lock structures.

Semaphore Usage Techniques #

A very classic use case of JDK’s Semaphore is to control concurrency. In RocketMQ, for asynchronous sending, in order to avoid too many pending async sends, you can use a Semaphore to control the concurrency. That is, if the specified concurrency is exceeded, flow control will be applied to prevent new tasks from being submitted. The typical usage of Semaphore is as follows:

public static void main(String[] args) {
    Semaphore semaphore = new Semaphore(10);
    for (int i = 0; i < 100; i++) {
        Thread t = new Thread(new Runnable() {
            @Override
            public void run() {
                doSomething(semaphore);
            }
        });
        t.start();
    }
}

private static void doSomething(Semaphore semaphore) {
    boolean acquired = false;
    try {
        acquired = semaphore.tryAcquire(3000, TimeUnit.MILLISECONDS);
        if (acquired) {
            System.out.println("Executing business logic");
        } else {
            System.out.println("Logic when semaphore is not acquired");
        }
    } catch (Throwable e) {
        e.printStackTrace();
    } finally {
        if (acquired) {
            semaphore.release();
        }
    }
}

The above example code is very simple. It uses a Semaphore to control the concurrency of doSomething(). A few key points are as follows:

  • tryAcquire: This method tries to acquire a permit. If there are no available permits, it will return false after the specified waiting time. Therefore, the release() method must be called only when tryAcquire() returns true, otherwise, permits may be over-released.
  • release: This method returns a permit.

The above scenario is relatively tense. If doSomething is an asynchronous method, the effectiveness of the above code will be greatly reduced. Even if there are many branches and it may be asynchronous again, returning the Semaphore becomes very complex. The key to using a Semaphore is to acquire one permit, and it must be released only once. If release() is called multiple times, the actual concurrency of the application may exceed the number of permits set. Let’s see the following test code:

5

Due to a logic error in the control of a thread, originally only one thread was allowed to acquire the permit, but now two threads are allowed to acquire the permit. This leads to a current concurrency of 6, which exceeds the designated 5. Of course, calling the release method an infinite number of times will not cause an error, nor will it infinitely increase the number of permits. The number of permits will not exceed the number passed in during construction.

Therefore, it is crucial to avoid calling the release method repeatedly in practice, and RocketMQ provides the following solution.

public class SemaphoreReleaseOnlyOnce {
    private final AtomicBoolean released = new AtomicBoolean(false);
    private final Semaphore semaphore;

    public SemaphoreReleaseOnlyOnce(Semaphore semaphore) {
        this.semaphore = semaphore;
    }
    public void release() {
        if (this.semaphore != null) {
            if (this.released.compareAndSet(false, true)) {
                this.semaphore.release();
            }
        }
    }
    public Semaphore getSemaphore() {
        return semaphore;
    }
}

This code wraps the Semaphore once and passes it into the business method. For example, in the doSomething method mentioned above, whether or not it creates a thread, the release method of SemaphoreReleaseOnlyOnce is called when it needs to be released. In this method, redundant checks are performed in order to call it multiple times. Since a business thread only holds a unique instance of SemaphoreReleaseOnlyOnce, this ensures that a business thread will only release once.

The release method implementation of SemaphoreReleaseOnlyOnce is also very simple. It uses the CAS (Compare-And-Swap) mechanism. If the method is called, it will set the released flag to true using the CAS operation. The next time it tries to release, it checks that the flag is already true and does not call the release method of Semaphore again, thus perfectly solving the problem.

Synchronous to Asynchronous Programming Technique #

In concurrent programming models, there is a classic concurrency design pattern called Future. In this pattern, the main thread submits a task to a thread and receives a token (Future). At this point, the main thread does not block and can continue to do other things, such as sending some Dubbo requests. When it needs to use the asynchronous execution result, it calls the get() method of Future. If the asynchronous result has already been completed, it immediately gets the result; otherwise, the main thread blocks and waits for the execution result.

The JDK’s Future model usually requires a thread pool object and a task request. In RocketMQ’s synchronous flush implementation, a thread is also used for asynchronous decoupling, achieving similar asynchronous effects without using the Future design pattern. Instead, it cleverly uses CountDownLatch.

6

In this diagram, the GroupCommitService provides synchronous flushing. The thread has a putRequest method for submitting a synchronous flush request GroupCommitRequest. This method does not block, allowing the main thread to continue calling the waitForFlush method of GroupCommitRequest to wait for the flush to complete. Although an asynchronous thread, GroupCommitService is primarily responsible for the business implementation of flushing. Let’s take a look at the implementation of waitForFlush and learn the key points of synchronous to asynchronous transformation:

7

It cleverly uses the await method of CountDownLatch to perform specified timed blocking. When will it be awakened? Of course, when the flushing operation ends, the countDown() method of countDownLatch is called by the flushing thread, allowing the await method to unblock. The implementation is actually very simple.

8

Isn’t this design elegant? It is more lightweight compared to Future.

CompletableFuture Programming Technique #

Starting from JDK 8, true asynchronous programming has become possible with the advent of CompletableFuture. True asynchronous programming means that after the main thread initiates an asynchronous request, even though it needs to eventually obtain the result of the asynchronous request, it does not need to explicitly call the Future.get() method in the code — achieving incomplete blocking of the main thread. We call this “true asynchronous”. Let me explain with an example of RocketMQ’s master-slave replication.

Before RocketMQ 4.7.0, the synchronous replication model was as shown in the following diagram:

9

In this model, the thread that writes messages calls SendMessageProcessor’s putMessage method to write the message. We call this the main thread. After writing the message to the master node, it needs to replicate the data to the slave node. During this process, the SendMessageProcessor main thread blocks and waits for the replication result from the slave node. Then, the result is sent back to the message sender client via the network. In other words, in this programming model, the message sender main thread does not follow the pattern of asynchronous programming, which is not very efficient.

The optimization idea: decouple the message sending processing logic from returning the response to the client. Use a thread pool to process the result of synchronous replication, and then write the response result to the client via another thread, so that SendMessageProcessor does not need to wait for the replication result, reducing blocking and improving the message processing speed of the main thread.

Starting from JDK 8, with the introduction of CompletableFuture, this optimization idea becomes even easier. Therefore, in RocketMQ 4.7.0, using CompletableFuture, SendMessageProcessor’s true asynchronous processing is achieved, and it does not violate the semantics of synchronous replication. The flow chart is as follows:

10

The key point of the implementation is to return a CompletableFuture when triggering synchronous replication. The asynchronous thread will notify the CompletableFuture of the result after the data replication is successful. Once the processing result is obtained, it sends the result to the client via the network.

The example code is as follows:

11

Then, in SendMessageProcessor, the key code for processing CompletableFuture is as follows:

12

The thenApply method is called to register an asynchronous callback for CompletableFuture. In the asynchronous callback, the result is sent to the client via the network, achieving decoupling between the message sender thread and the result return.

Question: In which thread does the thenApply method of CompletableFuture execute?

In CompletableFuture, an internal thread pool called ForkJoin thread pool is used to execute asynchronous callbacks.