12 Case Analysis Parallel Computation to Make Code Take Off

12 Case Analysis- Parallel Computation to Make Code Take Off #

Modern computers are often equipped with multiple cores, and even smartphones nowadays come with parallel processors. By utilizing multiprocessing and multithreading, tasks can be executed simultaneously, speeding up their execution.

Java provides a rich set of APIs to support multithreading development. For Java programmers, multithreading is an essential skill for interviews and work. But how is it applied to real-world scenarios? And what are the things to consider? This lesson will start with an example of parallel data retrieval and gradually explain one of the most frequently asked topics in interviews.

Parallel Data Retrieval #

Consider the following scenario. There is a user data interface that needs to return data within 50ms. The calling logic for this interface is complex, involving multiple interfaces and requiring data aggregation from over 20 interfaces. Even if the minimum time taken by these interfaces is 20ms, the total time taken would be 20 * 20 = 400ms under optimal conditions.

As shown in the diagram below, the only way to solve this problem is through parallel computing - using multiple threads to retrieve and compute results, and then combining them.

Drawing 0.png

However, this programming model is complex. If one were to use the raw thread API or functions like wait and notify, the code complexity could be quite high. Fortunately, most concurrent programming scenarios in Java can now be implemented using some utility classes from the concurrent package.

In this scenario, we can use the CountDownLatch to achieve our goal. CountDownLatch is essentially a counter that we initialize with the same value as the number of tasks to be executed. When a task completes, we decrement the counter. When the counter reaches 0, it means all tasks are completed, and the threads waiting on await can continue execution.

The code snippet below is a utility class I specifically created for this scenario. It takes two parameters: the number of jobs to be computed and the timeout for the entire task in milliseconds.

public class ParallelFetcher { 
    final long timeout; 
    final CountDownLatch latch; 
    final ThreadPoolExecutor executor = new ThreadPoolExecutor(100, 200, 1, 
            TimeUnit.HOURS, new ArrayBlockingQueue<>(100)); 
    public ParallelFetcher(int jobSize, long timeoutMill) { 
        latch = new CountDownLatch(jobSize); 
        timeout = timeoutMill; 
    } 
    public void submitJob(Runnable runnable) { 
        executor.execute(() -> { 
            runnable.run(); 
            latch.countDown(); 
        }); 
    } 
    public void await() { 
        try { 
            this.latch.await(timeout, TimeUnit.MILLISECONDS); 
        } catch (InterruptedException e) { 
            throw new IllegalStateException(); 
        } 
    } 
    public void dispose() { 
        this.executor.shutdown(); 
    } 
}

When the running time of a job exceeds the task’s time limit, it is terminated directly. This is the functionality of the await function.

The following example demonstrates the usage of this code. SlowInterfaceMock is a test class used to simulate timeout actions of remote services, waiting for 0~60 milliseconds. After running the program, the execution results are outputted to the result map.

public static void main(String[] args) { 
    final String userid = "123"; 
    final SlowInterfaceMock mock = new SlowInterfaceMock(); 
    ParallelFetcher fetcher = new ParallelFetcher(20, 50); 
    final Map<String, String> result = new HashMap<>();  
    fetcher.submitJob(() -> result.put("method0", mock.method0(userid))); 
    fetcher.submitJob(() -> result.put("method1", mock.method1(userid))); 
    fetcher.submitJob(() -> result.put("method2", mock.method2(userid))); 
    // ... (methods 3 to 19) ...
    fetcher.await();  
    System.out.println(fetcher.latch); 
    System.out.println(result.size()); 
    System.out.println(result);  
    fetcher.dispose(); 
}

Using this approach, our interface can return within a fixed amount of time. The concurrent package provides many similar tools to CountDownLatch. While enjoying their convenience, let’s take a look at some things to be aware of in this code.

Firstly, the sum of latch and the size of result should be 20. However, when running the code, this is highly unlikely to be the case as we lose some data. The reason is that the main method uses a HashMap class, which is not thread-safe. It causes inconsistency and incorrect results during concurrent execution. This problem can be solved by replacing HashMap with ConcurrentHashMap.

From this small issue, we can see that concurrent programming is not always straightforward. Carelessly approaching it can lead to pitfalls. If you are not familiar with how to use collections in concurrent scenarios, using thread-safe classes directly can reduce the chances of errors. Let’s take a look at the settings of the thread pool again. There are many parameters, and the maximum pool size reaches 200. So how many threads should be set? According to our needs, each request requires 20 threads, so 200 threads can support 10 concurrency. Assuming the worst case of 50ms, the minimum QPS supported by this interface is: 1000/50*10=200. This means that if the traffic increases, the number of threads can be increased.

In our usual business, there are two types of tasks: compute-intensive tasks and I/O-intensive tasks.

  • I/O-intensive tasks

For most common internet services, they are mostly I/O-intensive, such as waiting for database I/O, waiting for network I/O, etc. In this case, the best performance is achieved when the number of threads is equal to the number of I/O tasks. Although there is some performance overhead with thread context switching, it is acceptable compared to slow I/O.

The above scenario we mentioned is for synchronous I/O, where basically one task corresponds to one thread. Asynchronous NIO can accelerate this process, which will be explained in detail in “Chapter 15: Case Study: From BIO to NIO to AIO”.

  • Compute-intensive tasks

Compute-intensive tasks, on the other hand, are the opposite, such as some time-consuming algorithm logic. In order to achieve the highest CPU utilization and improve throughput, the best way is to minimize the switching between tasks, so the number of threads should be equal to the number of CPUs, which is the most efficient.

Understanding these characteristics of tasks can help increase the performance of the service by adjusting the number of threads. For example, in the high-performance network toolkit Netty, the default number of threads for EventLoop is twice the number of processors. If our business I/O is time-consuming, it may cause task blocking. There are two ways to solve this: increase the size of the worker thread pool or run time-consuming operations in another thread pool.

Exploring Thread Pool from the Perspective of Object Pooling #

Thread resources are also relatively expensive, and frequent creation and destruction can also affect system performance. Combining the description of object pooling in “Chapter 09: Case Study: Application Scenarios of Object Pooling”, thread resources are very suitable for pooling.

The design approach of thread pool is similar to other object pools, but there are some subtle differences. Let’s take a look at the constructor with the most complete parameters of the thread pool:

public ThreadPoolExecutor(int corePoolSize, 
    int maximumPoolSize, 
    long keepAliveTime, 
    TimeUnit unit, 
    BlockingQueue<Runnable> workQueue, 
    ThreadFactory threadFactory, 
    RejectedExecutionHandler handler)

There is nothing special about the first few parameters. Compared to ordinary object pools, since thread resources are always valid, it even reduces a lot of Idle configuration parameters (compared to object pools). Let’s focus on the workQueue and handler.

The process of task creation can be said to be a question that multithreading asks every time. As shown in the following figure, after a task is submitted, it first checks whether it has reached the minimum number of threads (coreSize). If it has, the task will be cached in the task queue. If the queue is also full, it checks whether the number of threads has reached the maximum number of threads (maximumPoolSize). If it has, it will enter the task rejection strategy (handler).

Drawing 2.png

Let’s take a look at some default shortcut thread pool codes in the Executors factory class.

1. Fixed-size thread pool

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
        0L, TimeUnit.MILLISECONDS,
        new LinkedBlockingQueue<Runnable>());
}

The maximum and minimum number of threads in the FixedThreadPool are equal, and setting them to unequal values will have no effect. The main reason is that the task queue used, LinkedBlockingQueue, is unbounded, so the code does not reach the logic that determines the maximum thread pool size. The setting of the keepAliveTime parameter is also meaningless because the thread pool recycles threads between corePoolSize and maximumPoolSize. The problem with this thread pool is that, because the queue is unbounded, it can lead to uncontrollable memory usage and tasks waiting in the queue for a long time when there are many tasks.

2. Unlimited-sized thread pool

public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
        60L, TimeUnit.SECONDS,
        new SynchronousQueue<Runnable>());
}

CachedThreadPool is another extreme case, where the minimum number of threads is 0 and threads that are idle for 1 minute will be recycled. When submitting tasks, a SynchronousQueue is used, which does not cache any tasks and directly creates new threads. This approach also has problems because it cannot control the use of resources, making it easy to cause memory overflow and excessive thread creation. In general, these two methods are not recommended in a production environment. Instead, we should use ThreadPoolExecutor to build thread pools according to specific requirements, which is also the recommended approach in Alibaba’s development specifications.

  • If the tasks can tolerate a certain amount of delay, it is reasonable to use LinkedBlockingQueue to specify an upper limit for the queue and cache a portion of the tasks.
  • If the tasks have high real-time requirements, such as RPC services, then using a SynchronousQueue to transfer tasks rather than caching them is preferable.

3. Rejection Policies

The default rejection policy is AbortPolicy, which throws an exception. Another similar policy is DiscardPolicy, which does nothing, not even throwing an exception. This is highly discouraged.

There is also a policy called CallerRunsPolicy, which uses the caller’s thread to execute tasks when the thread pool is saturated. For example, if the Controller’s thread pool is full, it will block and execute tasks in Tomcat’s thread pool. This can easily fill up the user threads and cause users to wait for a long time for business processing. Whether to use this strategy depends on the tolerable waiting time of the customers.

The last policy is called DiscardOldestPolicy. When encountering thread saturation, it first pops the oldest task from the queue and then adds the current task to the queue.

How to use asynchronous programming in Spring Boot? #

In Spring Boot, it is very easy to implement asynchronous tasks.

First, we need to add the @EnableAsync annotation to the startup class, and then add the @Async annotation to the methods that need to be executed asynchronously. In general, our tasks can run in the background without returning any data. However, some tasks may need to return some data, in which case we can use Future to return a proxy for other code to use.

The key code is as follows:

@Async
public Future<String> asyncMethod() {
    // Code to be executed asynchronously
    return new AsyncResult<String>("Result of the asynchronous method");
}

By default, Spring will start a default thread pool for asynchronous tasks. This thread pool is also unlimited in size, and resource usage is uncontrollable. Therefore, I strongly recommend that you set a suitable thread pool size using code.

@Bean 
public ThreadPoolTaskExecutor getThreadPoolTaskExecutor() { 
    ThreadPoolTaskExecutor taskExecutor = new ThreadPoolTaskExecutor(); 
    taskExecutor.setCorePoolSize(100); 
    taskExecutor.setMaxPoolSize(200); 
    taskExecutor.setQueueCapacity(100); 
    taskExecutor.setKeepAliveSeconds(60); 
    taskExecutor.setThreadNamePrefix("test-"); 
    taskExecutor.initialize(); 
    return taskExecutor; 
}

Inventory of Multithreaded Resources #

1. Thread-Safe Classes

We have mentioned HashMap and ConcurrentHashMap above, with the latter being thread-safe. There are many details in multithreading, so let’s take stock of some common thread-safe classes.

Note that each of the following comparisons is a knowledge point in interviews. To gain a deeper understanding, you need to read the source code of the JDK.

  • StringBuilder corresponds to StringBuffer. The latter mainly uses the synchronized keyword to achieve thread synchronization. It is worth noting that there is no difference between the two within a single method region. JIT’s compilation optimization will remove the effect of the synchronized keyword.
  • HashMap corresponds to ConcurrentHashMap. ConcurrentHashMap is a big topic. Here is a reminder that the implementation has changed between JDK 1.7 and 1.8. In 1.8, the concept of segmented locking (lock striping) has been removed, and synchronized has been used instead of ReentrantLock.
  • ArrayList corresponds to CopyOnWriteList. The latter is based on the concept of copy-on-write and is suitable for scenarios where reading is more frequent than writing.
  • LinkedList corresponds to ArrayBlockingQueue. ArrayBlockingQueue is not a fair lock by default, but you can modify the constructor parameters to make it a fair blocking queue. It is used very frequently in the concurrent package.
  • HashSet corresponds to CopyOnWriteArraySet.

Now let’s take a look at the importance of thread safety with a frequently encountered problem.

SimpleDateFormat is a commonly used date processing class, but it is not thread-safe. It can cause many problems in a multi-threaded environment, and in my previous work, I found a lot of such misuse through Sonar scanning. In interviews, I also specifically ask about SimpleDateFormat to judge whether the interviewee has a basic understanding of multi-threaded programming.

Running the code in the above figure, you can see that the time is already messed up.

Thu May 01 08:56:40 CST 618104 
Thu May 01 08:56:40 CST 618104 
Mon Jul 26 08:00:04 CST 1 
Tue Jun 30 08:56:00 CST 2020 
Thu Oct 01 14:45:20 CST 16 
Sun Jul 13 01:55:40 CST 20220200 
Wed Dec 25 08:56:40 CST 2019 
Sun Jul 13 01:55:40 CST 20220200

The way to solve this problem is to use ThreadLocal variables, as shown in the code in the figure below, which can effectively solve the thread safety problem.

Drawing 6.png

2. Synchronization Methods for Threads

There are many ways to achieve thread synchronization in Java, which can be roughly divided into the following 8 categories.

  • Use functions like wait, notify, and notifyAll in the Object class. Since this programming model is very complex, it is rarely used now. There is a key point here, that is, calls to these functions must be placed within synchronized code blocks to run normally.
  • Use the ThreadLocal thread-local variable, with one variable per thread. This will be explained in detail in this lesson.
  • Use the synchronized keyword to modify methods or code blocks. This is the most common way in Java, which involves lock upgrading.
  • Use the reentrant lock ReentrantLock in the Concurrent package. This lock is implemented using the CAS method.
  • Use the volatile keyword to control the visibility of variables. This keyword guarantees the visibility of variables, but not their atomicity.
  • Use thread-safe blocking queues to achieve thread synchronization. For example, use LinkedBlockingQueue to implement a simple producer-consumer.
  • Use atomic variables. Atomic * series methods are also implemented using CAS. We will introduce CAS in the next lesson.
  • Use the join method of the Thread class to make multiple threads execute in a specified order.

The screenshot below shows a simple producer-consumer instance implemented using LinkedBlockingQueue. In many online written test scenarios, this question frequently appears. You can see that we also use a variable modified by volatile to determine whether the program continues to run. This is also a common scenario for volatile variables.

Drawing 7.png

FastThreadLocal #

In our usual programming, the most commonly used class is ThreadLocal. Take Spring, the most commonly used framework, as an example. Its transaction management propagation mechanism is implemented using ThreadLocal. Since ThreadLocal is thread-private, Spring’s transaction propagation mechanism cannot span threads. When asked if the Spring transaction management includes sub-threads, you should be able to understand the interviewer’s true intention.

/** 
    * Holder to support the {@code currentTransactionStatus()} method, 
    * and to support communication between different cooperating advices 
    * (e.g. before and after advice) if the aspect involves more than a 
    * single method (as will be the case for around advice). 
*/
private static final ThreadLocal<TransactionInfo> transactionInfoHolder = 
        new NamedThreadLocal<>("Current aspect-driven transaction");

Since Java already has the ThreadLocal class, why did Netty create a structure called FastThreadLocal? Let’s first take a look at the implementation of ThreadLocal.

In the Thread class, there is a member variable called ThreadLocals, which stores all the custom information related to this thread. The definition of this variable is in the Thread class, but the operations are performed in the ThreadLocal class.

public T get() { 
    Thread t = Thread.currentThread(); 
    ThreadLocalMap map = getMap(t); 
    ... 
} 
ThreadLocalMap getMap(Thread t) { 
    return t.threadLocals; 
}

The problem lies with the ThreadLocalMap class. Although it is named Map, it does not implement the Map interface. As shown in the diagram below, when ThreadLocalMap rehashes, it does not use the array + linked list + red-black tree approach used by HashMap. Instead, it uses only an array and uses an open addressing method (search sequentially until a free position is found) to resolve conflicts. This approach is very inefficient.

Drawing 8.png

Due to Netty’s frequent use of ThreadLocal, Netty has optimized it. The reason why it is fast is because it manipulates the underlying data structure by using constant indexes to locate elements, rather than using the probing algorithm used by the JDK by default.

Remember the false sharing problem mentioned in Lesson 03? The underlying InternalThreadLocalMap optimizes the cache line accordingly.

Drawing 9.png

What problems have you encountered in multi-threaded programming? #

Based on the knowledge we have summarized above, we can see that multi-threaded programming is a relatively advanced skill. In interviews, interviewers often ask about problems encountered in multi-threaded programming to assess your practical application of this knowledge.

Let’s summarize the examples given in the article:

  • Incorrect usage of thread pool, resulting in uncontrollable resource allocation.
  • In I/O intensive scenarios, the thread pool is too small, resulting in frequent request failures.
  • The thread pool uses the CallerRunsPolicy saturation policy, resulting in blocking of business threads.
  • Time inconsistency caused by SimpleDateFormat.

In addition, I would like to emphasize that when dealing with looping tasks, always remember to catch exceptions. Specifically, exceptions like NPE, which are non-caught exceptions, often don’t trigger code prompts in IDEs. I have seen many cases where interruption of tasks occurred due to forgetting to handle exceptions. Although this problem occurs less frequently, it is difficult to pinpoint and it is important to maintain good coding habits.

while(!isInterrupted()) {
    try {
        ...
    } catch(Exception ex) {
        ...
    }
}

In a multi-threaded environment, exception logs are very important, but the default behavior of the thread pool is not very practical in this regard. In the code below, when a task throws an exception, nothing is output to the terminal and the exception information is lost, which is not friendly for problem diagnosis.

ExecutorService executor = Executors.newCachedThreadPool();
executor.submit(() -> {
    String s = null;
    s.substring(0);
});
executor.shutdown();

By tracing the execution of the task, we can find the method in ThreadPoolExecutor that handles exceptions when tasks occur. It throws the exception to the afterExecute method for processing.

Drawing 10.png

Unfortunately, the afterExecute method in ThreadPoolExecutor has no implementation, it is an empty method.

protected void afterExecute(Runnable r, Throwable t) { }

If you override afterExecute to change this default behavior, it comes with a cost. In fact, tasks submitted using the submit method will return a Future object, and the exception will be printed only when its get method is called. Tasks submitted using the submit method will never reach the red line in the diagram, and there is only this way to retrieve the exception.

Only tasks submitted using the execute method will reach this exception handling code. If you want to print the exception by default, it is recommended to use the execute method to submit tasks. The difference between the execute and submit methods is not as simple as just the return value.

About Asynchronous #

A colleague once asked me, “Async does not reduce the number of execution steps or improve the algorithm, so why do we say that async is faster?”

In fact, this is a misunderstanding of the “effect of asynchronous”. Async is a programming model that moves time-consuming operations to background threads to reduce blocking of the main business flow, so we say that async speeds up the process. However, if your system has reached its resource limit, async will not produce any effects, it mainly optimizes those conflicting and waiting scenarios.

In our previous lessons, we have used asynchronous methods such as buffering, caching, and pooling. It helps optimize request-response by properly utilizing resources. Our system does respond faster as a result. In Lesson 15, we will delve into this concept further in the case study.

Asynchronous also enables decoupling of business logic. As shown in the diagram below, it is somewhat similar to the producer-consumer model. The main thread is responsible for producing tasks and storing them in the to-be-executed list, while the consumer thread pool is responsible for consuming tasks and performing the actual business logic.

Drawing 11.png

Summary #

The topic of multi-threading is vast, and this lesson covers quite a bit of content. Let’s summarize the key points.

This lesson assumes that you already have a basic understanding of multi-threading (otherwise, it may be difficult to follow). We started with a practical application scenario of CountDownLatch and discussed two key aspects of thread pools: the blocking queue and the rejection policy.

Next, we learned how to configure asynchronous task execution in common frameworks like Spring Boot. We also reviewed some important concepts in multi-threading, including thread-safe utilities and synchronization methods. Finally, we introduced the most commonly used ThreadLocal and learned about Netty’s optimization of this utility class.

All the topics covered in this lesson are frequently asked in interviews. Apart from the multitude of APIs, one of the difficulties of multi-threaded programming is debugging asynchronous patterns.

We also discussed the challenges of answering experience-based questions, such as “What problems have you encountered and how did you solve them in multi-threaded programming?” These types of questions are highly likely to be asked in interviews.