01 by Using Concurrency Libraries Is Thread Safety Guaranteed

01 By Using Concurrency Libraries Is Thread Safety Guaranteed #

As the first lecture of the course, today I want to talk to you about the topic related to using concurrent utility libraries.

During code review discussions, we sometimes hear some one-sided views and conclusions about thread safety and concurrent tools, such as “We can solve concurrency issues by changing HashMap to ConcurrentHashMap”, or “Let’s try the lock-free CopyOnWriteArrayList, it performs better”. In fact, these statements are not entirely accurate.

Indeed, modern programming languages provide various concurrent utility classes for developers to facilitate multi-threaded programming. However, if we do not fully understand their use cases, the problems they solve, and the best practices, blind usage can lead to pitfalls. These pitfalls can range from minor performance losses to major issues where the correctness of business logic cannot be guaranteed in a multi-threaded context.

I need to clarify that here, concurrent utility classes refer to the libraries used to solve concurrency problems in a multi-threaded environment. Generally, concurrent utility packages consist of synchronizers and containers. In business code, there are more cases where concurrent containers are used, so the examples I will share today will also focus on concurrent containers.

Next, let’s take a look at the most common pitfalls encountered when using concurrent utility libraries, and how to solve and avoid them.

Bug Caused by Unawareness of Thread Reuse Leading to User Information Confusion #

Previously, a colleague from the business team reported a strange issue in the production environment: sometimes, they would retrieve someone else’s user information. After examining the code, I discovered that they used ThreadLocal to cache the retrieved user information.

We know that ThreadLocal is suitable for isolating variables between threads, and for scenarios where data is shared among methods or classes. If retrieving user information is costly (such as querying user information from a database), then caching the data in ThreadLocal is an appropriate approach. But why did this lead to a bug where user information becomes confused?

Let’s take a look at a specific case.

Using Spring Boot, let’s create a web application that uses ThreadLocal to store an Integer value, which will temporarily represent the user information to be stored in the thread. The initial value is null. In the business logic, I first retrieve the value from ThreadLocal, then set the parameter passed in from the outside to ThreadLocal, to simulate the logic of retrieving user information from the current context. Afterwards, I retrieve the value again, and finally output the retrieved values and the thread name twice.

private static final ThreadLocal<Integer> currentUser = ThreadLocal.withInitial(() -> null);

@GetMapping("wrong")
public Map wrong(@RequestParam("userId") Integer userId) {
    // Retrieve the user information from ThreadLocal before setting it
    String before = Thread.currentThread().getName() + ":" + currentUser.get();
    // Set the user information to ThreadLocal
    currentUser.set(userId);
    // Retrieve the user information from ThreadLocal after setting it
    String after = Thread.currentThread().getName() + ":" + currentUser.get();
    // Consolidate and output the results of the two retrievals
    Map result = new HashMap();
    result.put("before", before);
    result.put("after", after);
    return result;
}

In theory, the value retrieved before setting the user information should always be null. However, we need to be aware that the program runs in Tomcat, and the executing thread is Tomcat’s worker thread, which is based on a thread pool.

As the name implies, a thread pool reuses a fixed number of threads. Once a thread is reused, the value retrieved from ThreadLocal for the first time is likely to be the residual value from the previous user’s request. At this point, the user information in ThreadLocal becomes the information of another user.

In order to quickly reproduce this issue, I will set the configuration parameter of Tomcat in the configuration file, and set the maximum number of threads in the worker thread pool to 1, ensuring that requests are always processed by the same thread:

server.tomcat.max-threads=1

After running the program, let user 1 make a request to the interface. We can see that the retrieved user ID for the first and second retrievals are null and 1 respectively, which is as expected:

img

Then let user 2 make a request to the interface, and the bug appears. The retrieved user ID for the first and second retrievals are 1 and 2 respectively, indicating that the first retrieval obtained the user information of user 1. The reason is that Tomcat’s thread pool reused the thread. From the figure, we can see that the two requests were handled by the same thread: http-nio-8080-exec-1.

img

This example tells us that when writing business logic, we need to first understand which threads the code will run on:

  • We may complain that knowing about multithreading is useless because we haven’t explicitly enabled multithreading in our code. However, in the case of running business code on a web server like Tomcat, which runs in a multithreaded environment (otherwise, the interface would not be able to support such high concurrency), we cannot assume that there won’t be any thread safety issues just because we haven’t explicitly enabled multithreading.

  • Because thread creation is costly, web servers often use thread pools to handle requests, which means that threads will be reused. At this point, when using tools like ThreadLocal to store some data, we need to be particularly careful to explicitly clear the data that was set after the code has finished running. This issue can also occur if a custom thread pool is used in the code.

Having understood this knowledge point, the solution to fixing this code is to explicitly clear the data in ThreadLocal in the finally block. This way, even if a previous thread is reused for handling a new request, it won’t retrieve incorrect user information. Here is the modified code:

@GetMapping("right")
public Map right(@RequestParam("userId") Integer userId) {
    String before = Thread.currentThread().getName() + ":" + currentUser.get();
    currentUser.set(userId);
    try {
        String after = Thread.currentThread().getName() + ":" + currentUser.get();
        Map result = new HashMap();
        result.put("before", before);
        result.put("after", after);
        return result;
    } finally {
        // Remove the data from ThreadLocal in the finally block to ensure data consistency
        currentUser.remove();
    }
}

After running the program again, we can verify that the bug where the user information retrieved for the first query was incorrect no longer occurs:

img

ThreadLocal uses an exclusive resource to solve thread safety problems. But what if we really need to have resources shared between threads? In that case, we might need to use thread-safe containers.

Using thread-safe concurrent tools does not solve all thread safety problems #

ConcurrentHashMap introduced in JDK 1.5 is a high-performance thread-safe hash table container. The term “thread-safe” can be easily misunderstood because ConcurrentHashMap only guarantees that the provided atomic read and write operations are thread-safe.

I have seen this misconception in many business codes, such as the following scenario. There is a Map with 900 elements, and now we need to add 100 more elements to it concurrently by 10 threads. The developer mistakenly believes that using ConcurrentHashMap eliminates any thread safety issues, so they write the following code without thinking: In the logic of each thread, they first use the size method to get the current number of elements, calculate how many elements the ConcurrentHashMap still needs to add, and log this value. Then, they use the putAll method to add the missing elements.

For the convenience of observing the problem, we output the initial and final number of elements in this Map.

// Number of threads
private static int THREAD_COUNT = 10;
// Total number of elements
private static int ITEM_COUNT = 1000;
// Helper method to get a ConcurrentHashMap with a specified number of elements
private ConcurrentHashMap<String, Long> getData(int count) {
    return LongStream.rangeClosed(1, count)
            .boxed()
            .collect(Collectors.toConcurrentMap(i -> UUID.randomUUID().toString(), Function.identity(),
                    (o1, o2) -> o1, ConcurrentHashMap::new));
}

@GetMapping("wrong")
public String wrong() throws InterruptedException {
    ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
    // Initial 900 elements
    log.info("init size:{}", concurrentHashMap.size());
    ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
    // Use a thread pool to handle the logic concurrently
    forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
        // Find out how many elements still need to be added
        int gap = ITEM_COUNT - concurrentHashMap.size();
        log.info("gap size:{}", gap);
        // Add the elements
        concurrentHashMap.putAll(getData(gap));
    }));
    // Wait for all tasks to complete
    forkJoinPool.shutdown();
    forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    // Will the final number of elements be 1000?
    log.info("finish size:{}", concurrentHashMap.size());
    return "OK";
}

The log output after accessing the interface is as follows:

img

From the log, we can see:

  • The initial size is 900, which is as expected, and there is a need to add 100 elements.

  • Worker 1 thread finds that there are still 36 elements to be added, which is not a multiple of 100.

  • Worker 13 thread finds that the number of elements to be added is negative, indicating that it has been overfilled.

  • The final size of the HashMap is 1536, which obviously does not meet the expectation of filling it with 1000 elements.

In this scenario, we can give an illustrative example. ConcurrentHashMap is like a large basket with 900 oranges in it. We expect to fill the basket with 1000 oranges, which means adding another 100 oranges. There are 10 workers doing this task, and they will calculate how many oranges need to be added after they arrive at work, and then add the oranges to the basket.

ConcurrentHashMap itself can ensure that multiple workers do not interfere with each other when adding things to it. However, it cannot guarantee that when worker A sees that there are still 100 oranges to be added but they have not been added yet, worker B cannot see the number of oranges in the basket. What is more important is that the operation of adding 100 oranges to the basket is not atomic, and in the eyes of others, there may be a moment when there are 964 oranges in the basket and still need to add 36 oranges.

Returning to ConcurrentHashMap, we need to pay attention to the limitations of the methods or capabilities provided by ConcurrentHashMap to the outside world:

  • Using ConcurrentHashMap does not mean that the state between its multiple operations is consistent and that no other thread is operating on it. If you need to ensure thread safety, you need to manually lock it.

  • Aggregation methods such as size, isEmpty, and containsValue may reflect intermediate states of ConcurrentHashMap in concurrent situations. Therefore, in concurrent situations, the return values of these methods can only be used as references and cannot be used for flow control. Obviously, using the size method to calculate the difference value is a flow control.

  • Aggregation methods like putAll cannot guarantee atomicity, and getting data during putAll may only get part of the data.

The solution to the code is simple, just lock the whole logic:

@GetMapping("right")
public String right() throws InterruptedException {
    ConcurrentHashMap<String, Long> concurrentHashMap = getData(ITEM_COUNT - 100);
    log.info("init size:{}", concurrentHashMap.size());

    ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
    forkJoinPool.execute(() -> IntStream.rangeClosed(1, 10).parallel().forEach(i -> {
        // Lock this ConcurrentHashMap for the following complex logic
        synchronized (concurrentHashMap) {
            int gap = ITEM_COUNT - concurrentHashMap.size();
            log.info("gap size:{}", gap);
            concurrentHashMap.putAll(getData(gap));
        }
    }));
    forkJoinPool.shutdown();
    forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    log.info("finish size:{}", concurrentHashMap.size());
    return "OK";
}

After calling the interface again, the log output of the program meets expectations:

img

We can see that only one thread finds that 100 elements need to be added, while the other 9 threads find that no elements need to be added. Finally, the size of the Map is 1000.

At this point, you might ask again, if we have to lock the entire ConcurrentHashMap, why not just use a regular HashMap.

In fact, it is not entirely true.

ConcurrentHashMap provides some atomic simple composite logical methods that, when used properly, can exert its power. This leads to another common problem in code: when using advanced utility classes provided by libraries, developers may still use them in the old way because they are not using their features, so they cannot unleash their power.

Lack of fully understanding the features of concurrent tools, resulting in the inability to fully leverage their power #

Let’s take a look at a scenario where we use a map to count the occurrences of keys. This logic is very common in business code.

  • Use ConcurrentHashMap to count the keys, with a range of 10.

  • Use a maximum of 10 threads, loop 10 million times, and increment a random key each time.

  • If the key does not exist, set the initial value to 1.

The code is as follows:

// Loop count
private static int LOOP_COUNT = 10000000;
// Thread count
private static int THREAD_COUNT = 10;
// Item count
private static int ITEM_COUNT = 10;
private Map<String, Long> normaluse() throws InterruptedException {
    ConcurrentHashMap<String, Long> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
    ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
    forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
        // Get a random key
        String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
        synchronized (freqs) {
            if (freqs.containsKey(key)) {
                // Increment by 1 if key already exists
                freqs.put(key, freqs.get(key) + 1);
            } else {
                // Set initial value to 1 if key does not exist
                freqs.put(key, 1L);
            }
        }
    }));
    forkJoinPool.shutdown();
    forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    return freqs;
}

Drawing lessons from before, we lock the map directly using a lock and then perform the logic of checking, reading the current accumulated value, incrementing by 1, and saving the incremented value. This code has no problems in terms of functionality, but it cannot fully leverage the power of ConcurrentHashMap. The improved code is as follows:

private Map<String, Long> gooduse() throws InterruptedException {
    ConcurrentHashMap<String, LongAdder> freqs = new ConcurrentHashMap<>(ITEM_COUNT);
    ForkJoinPool forkJoinPool = new ForkJoinPool(THREAD_COUNT);
    forkJoinPool.execute(() -> IntStream.rangeClosed(1, LOOP_COUNT).parallel().forEach(i -> {
        String key = "item" + ThreadLocalRandom.current().nextInt(ITEM_COUNT);
        // Use computeIfAbsent() method to instantiate LongAdder and perform thread-safe counting with LongAdder
        freqs.computeIfAbsent(key, k -> new LongAdder()).increment();
    }));
    forkJoinPool.shutdown();
    forkJoinPool.awaitTermination(1, TimeUnit.HOURS);
    // Since our Value is LongAdder instead of Long, we need to convert it before returning
    return freqs.entrySet().stream()
            .collect(Collectors.toMap(
                    e -> e.getKey(),
                    e -> e.getValue().longValue())
            );
}

In this improved code, we cleverly leverage the following two points:

  • Using the atomic method computeIfAbsent of ConcurrentHashMap to perform compound logical operations: check if the key exists in the value. If not, put the result of the lambda expression into the map as the value, which is to create a new LongAdder object. Finally, return the value.

  • Since the value returned by the computeIfAbsent method is a LongAdder, which is a thread-safe accumulator, we can directly call its increment method to perform the accumulation.

In this way, we achieve the ultimate performance while ensuring thread safety, replacing the previous 7 lines of code with just 1 line.

Let’s compare the performance of the two pieces of code before and after the optimization through a simple test:

@GetMapping("good")
public String good() throws InterruptedException {
    StopWatch stopWatch = new StopWatch();
    stopWatch.start("normaluse");
    Map<String, Long> normaluse = normaluse();
    stopWatch.stop();
    // Verify the number of elements
    Assert.isTrue(normaluse.size() == ITEM_COUNT, "normaluse size error");
    // Verify the total count
    Assert.isTrue(normaluse.entrySet().stream()
                    .mapToLong(item -> item.getValue()).reduce(0, Long::sum) == LOOP_COUNT
            , "normaluse count error");
    stopWatch.start("gooduse");
    Map<String, Long> gooduse = gooduse();
    stopWatch.stop();
    Assert.isTrue(gooduse.size() == ITEM_COUNT, "gooduse size error");
    Assert.isTrue(gooduse.entrySet().stream()
                    .mapToLong(item -> item.getValue())
                    .reduce(0, Long::sum) == LOOP_COUNT
            , "gooduse count error");
    log.info(stopWatch.prettyPrint());
    return "OK";
}

This test code has nothing special. It uses StopWatch to test the performance of the two pieces of code, and finally includes an assertion to verify the number of elements in the map and the sum of all values, to check the correctness of the code. The test results are as follows:

--------------------------------
ms     %     Task name
--------------------------------
02656  100%  normaluse
00019  000%  gooduse
--------------------------------

As you can see, the optimized code is 10 times more performant than using locks to operate ConcurrentHashMap.

You may wonder why computeIfAbsent is so efficient.

The answer lies in the most critical part of the source code, which is the CAS (compare-and-swap) implementation by Java’s built-in Unsafe. It ensures the atomicity of writing data at the virtual machine level, and is much more efficient than locking:

static final <K,V> boolean casTabAt(Node<K,V>[] tab, int i,
                                    Node<K,V> c, Node<K,V> v) {
    return U.compareAndSetObject(tab, ((long)i << ASHIFT) + ABASE, c, v);
}

Advanced concurrent tools like ConcurrentHashMap do provide some advanced APIs, but we can only maximize their power by fully understanding their features, rather than blindly using them just because they are advanced and cool.

Failure to recognize the usage scenarios of concurrent tools leading to performance issues #

In addition to the commonly used concurrent tool ConcurrentHashMap, our toolkit also includes some new faces designed for specific scenarios. Generally speaking, general solutions for common scenarios have acceptable performance in all scenarios and can be considered as “universal remedies.” On the other hand, specialized implementations for specific scenarios may have higher performance than general solutions, but they must be used in the intended scenarios, otherwise, performance issues or even bugs may occur.

Previously, when investigating a production performance issue, we found that a simple non-database operation business logic took more time than expected, and modifying local cache was much slower than writing to the database. Upon examining the code, we discovered that the developer used CopyOnWriteArrayList to cache a large amount of data, and the data changes were relatively frequent.

CopyOnWrite is a trendy technology that is used in both Linux and Redis. In Java, although CopyOnWriteArrayList is a thread-safe ArrayList, its implementation involves copying the entire data each time it is modified. Thus, it has obvious use scenarios, namely, scenarios with more reads than writes or scenarios that require lock-free reads.

If we want to use CopyOnWriteArrayList, it must be because the scenario calls for it, not just to appear cool. If the read-write ratio is balanced or there are a large number of write operations, the performance of CopyOnWriteArrayList will be very poor.

Let’s write a test code to compare the read and write performance of CopyOnWriteArrayList with the conventional locking approach using ArrayList. In this code snippet, we have written separate test methods for concurrent writes and reads, which measure the time taken for a certain number of write or read operations.

// Test the performance of concurrent writes
@GetMapping("write")
public Map testWrite() {
    List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
    List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
    StopWatch stopWatch = new StopWatch();
    int loopCount = 100000;
    stopWatch.start("Write:copyOnWriteArrayList");
    // Perform 100,000 concurrent write operations on CopyOnWriteArrayList
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.add(ThreadLocalRandom.current().nextInt(loopCount)));
    stopWatch.stop();
    stopWatch.start("Write:synchronizedList");
    // Perform 100,000 concurrent write operations on the locked ArrayList
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> synchronizedList.add(ThreadLocalRandom.current().nextInt(loopCount)));
    stopWatch.stop();
    log.info(stopWatch.prettyPrint());
    Map result = new HashMap();
    result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
    result.put("synchronizedList", synchronizedList.size());
    return result;
}

// Helper method to populate the List
private void addAll(List<Integer> list) {
    list.addAll(IntStream.rangeClosed(1, 1000000).boxed().collect(Collectors.toList()));
}

// Test the performance of concurrent reads
@GetMapping("read")
public Map testRead() {
    // Create two test objects
    List<Integer> copyOnWriteArrayList = new CopyOnWriteArrayList<>();
    List<Integer> synchronizedList = Collections.synchronizedList(new ArrayList<>());
    // Populate data
    addAll(copyOnWriteArrayList);
    addAll(synchronizedList);
    StopWatch stopWatch = new StopWatch();
    int loopCount = 1000000;
    int count = copyOnWriteArrayList.size();
    stopWatch.start("Read:copyOnWriteArrayList");
    // Perform 1,000,000 concurrent read operations on CopyOnWriteArrayList
    IntStream.rangeClosed(1, loopCount).parallel().forEach(__ -> copyOnWriteArrayList.get(ThreadLocalRandom.current().nextInt(count)));
    stopWatch.stop();
    stopWatch.start("Read:synchronizedList");
    // Perform 1,000,000 concurrent read operations on the locked ArrayList
    IntStream.range(0, loopCount).parallel().forEach(__ -> synchronizedList.get(ThreadLocalRandom.current().nextInt(count)));
    stopWatch.stop();
    log.info(stopWatch.prettyPrint());
    Map result = new HashMap();
    result.put("copyOnWriteArrayList", copyOnWriteArrayList.size());
    result.put("synchronizedList", synchronizedList.size());
    return result;
}

When running the program, it can be observed that in scenarios with a large number of writes (100,000 add operations), CopyOnWriteArrayList is almost 100 times slower than the synchronized ArrayList:

img

On the other hand, in scenarios with a large number of reads (1,000,000 get operations), CopyOnWriteArrayList is more than 5 times faster than the synchronized ArrayList:

img

You may wonder why CopyOnWriteArrayList is so slow in scenarios with a large number of writes.

The answer lies in the source code. Let’s take the add method as an example. Each time add is called, a new array is created using Arrays.copyOf, resulting in a significant memory consumption due to frequent memory allocation and releasing:

/**
 * Appends the specified element to the end of this list.
 *
 * @param e element to be appended to this list
 * @return {@code true} (as specified by {@link Collection#add})
 */
public boolean add(E e) {
    synchronized (lock) {
        Object[] elements = getArray();
        int len = elements.length;
        Object[] newElements = Arrays.copyOf(elements, len + 1);
        newElements[len] = e;
        setArray(newElements);
        return true;
    }
}

Key Takeaways #

Today, I mainly shared with you four common mistakes that developers make when using concurrency tools to solve thread safety issues.

First, some developers only know how to use concurrency tools but don’t understand the context of the current thread. They may use ThreadLocal to cache data, thinking that ThreadLocal isolates data between threads and there won’t be any thread safety issues. However, they may encounter data corruption caused by thread reuse. It is important to remember to clean up the data in ThreadLocal before the business logic ends.

Second, some developers mistakenly believe that using concurrency tools can solve all thread safety problems. They expect to solve the issues by replacing thread-unsafe classes with thread-safe ones. For example, they may think that using ConcurrentHashMap can solve thread safety problems, but they neglect to lock compound logic, leading to errors in the business logic. If you want to maintain overall consistency in container operations within a business logic, you need to use locks.

Third, some developers don’t fully understand the features of concurrency tools and use them in the old-fashioned way, thus failing to leverage their performance advantages. For example, they may use ConcurrentHashMap but fail to fully utilize its CAS-based safe methods and still use locks to implement logic. You can read the documentation of ConcurrentHashMap to see if the relevant atomic operation APIs can meet your business needs. If they can, it is recommended to use them.

Fourth, some developers don’t have a clear understanding of the appropriate usage scenarios for certain concurrency tools. As a result, they use the wrong tools in inappropriate scenarios, leading to poor performance. For example, they may not understand the suitable usage scenario for CopyOnWriteArrayList and use it in scenarios with balanced reading and writing or with a lot of write operations, causing performance issues. In such scenarios, you can consider using a regular List.

In fact, the reason why these four common mistakes are easy to make is that when using concurrency tools, we don’t fully understand the potential problems and usage scenarios. Therefore, I would like to share two suggestions with you:

  1. Always read the official documentation (such as the Oracle JDK documentation) carefully. Fully read the official documentation, understand the usage scenarios of the tools and how to use their APIs, and conduct some small experiments. After understanding, you can avoid most of the pitfalls when using the tools.

  2. If your code runs in a multi-threaded environment, there will be concurrency issues, which are not easy to reproduce. You may need to use stress testing to simulate concurrency scenarios and discover any bugs or performance issues.

The code used today has been uploaded to GitHub. You can click the link to view it.

Reflection and Discussion #

  1. Today we have used ThreadLocalRandom multiple times. Do you think it is possible to set its instance to a static variable for reuse in a multi-threaded environment?

  2. ConcurrentHashMap also provides the putIfAbsent method. Can you explain the difference between computeIfAbsent and putIfAbsent methods by referring to the JDK documentation?

Have you encountered any other pitfalls when using concurrency tools? I am Zhu Ye, welcome to leave a comment in the comment section to share your thoughts. You are also welcome to share this article with your friends or colleagues to exchange ideas together.