03 Thread Pool the Most Common Yet Error Prone Component in Business Code

03 Thread Pool The Most Common Yet Error-Prone Component in Business Code #

Today, I will talk about some issues to note when using a thread pool.

In programs, we use various pooling techniques to cache expensive objects, such as thread pools, connection pools, and memory pools. Generally, a certain number of objects are pre-created and stored in the pool. When needed, they can be directly retrieved and reused. Additionally, the pool’s caching object quantity can be adjusted dynamically using specific strategies.

Creating a large number of threads indiscriminately and without control can result in performance issues due to the high cost of thread creation. Therefore, for short and quick tasks, it is generally recommended to use a thread pool instead of creating threads directly.

Today, we will focus on the topic of thread pools and discuss what should be paid attention to through three real incidents in production.

Manual declaration of thread pools is required #

The Executors class in Java defines some convenient utility methods to help us quickly create thread pools. However, the “Alibaba Java Development Manual” states that it is prohibited to use these methods to create thread pools. Instead, we should manually create thread pools using new ThreadPoolExecutor. This rule is based on numerous real-life incidents, with the most typical being the newFixedThreadPool and newCachedThreadPool methods, which may cause OOM issues due to resource exhaustion.

First, let’s take a look at why newFixedThreadPool may encounter OOM issues.

Let’s write a test code to initialize a single-threaded FixedThreadPool. We will loop 100 million times to submit tasks to the thread pool, and each task will create a fairly large string and then sleep for one hour:

@GetMapping("oom1")
public void oom1() throws InterruptedException {

    ThreadPoolExecutor threadPool = (ThreadPoolExecutor) Executors.newFixedThreadPool(1);

    // Print thread pool information, I will explain this code later
    printStats(threadPool);

    for (int i = 0; i < 100000000; i++) {

        threadPool.execute(() -> {
            String payload = IntStream.rangeClosed(1, 1000000)
                    .mapToObj(__ -> "a")
                    .collect(Collectors.joining("")) + UUID.randomUUID().toString();
            try {
                TimeUnit.HOURS.sleep(1);
            } catch (InterruptedException e) {
            }

            log.info(payload);

        });
    }

    threadPool.shutdown();
    threadPool.awaitTermination(1, TimeUnit.HOURS);
}

Soon after executing the program, we can see the following OOM error in the logs:

Exception in thread "http-nio-45678-ClientPoller" java.lang.OutOfMemoryError: GC overhead limit exceeded

By examining the source code of newFixedThreadPool, it is not difficult to find that the thread pool’s work queue directly creates a LinkedBlockingQueue with a default constructor, which is an unbounded queue with a length of Integer.MAX_VALUE:

public static ExecutorService newFixedThreadPool(int nThreads) {

    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());

}

public class LinkedBlockingQueue<E> extends AbstractQueue<E>
        implements BlockingQueue<E>, java.io.Serializable {

    ...

    /**
     * Creates a {@code LinkedBlockingQueue} with a capacity of
     * {@link Integer#MAX_VALUE}.
     */
    public LinkedBlockingQueue() {
        this(Integer.MAX_VALUE);
    }
    ...
}

Although using newFixedThreadPool can control the number of working threads to a fixed amount, the task queue is unbounded. If there are many tasks and they are executed slowly, the queue may quickly accumulate and consume all available memory, resulting in an OOM error.

Let’s slightly modify the previous example to use the newCachedThreadPool method to obtain a thread pool. After running the program for a while, we can also see the following OOM exception:

[11:30:30.487] [http-nio-45678-exec-1] [ERROR] [.a.c.c.C.[.[.[/].[dispatcherServlet]:175 ] - Servlet.service() for servlet [dispatcherServlet] in context with path [] threw exception [Handler dispatch failed; nested exception is java.lang.OutOfMemoryError: unable to create new native thread] with root cause

java.lang.OutOfMemoryError: unable to create new native thread 

From the logs, we can see that the cause of this OOM error is the inability to create new threads. By examining the source code of newCachedThreadPool, we can see that the maximum number of threads in this type of thread pool is Integer.MAX_VALUE, which can be considered as unlimited. The work queue SynchronousQueue is a blocking queue with no storage space. This means that whenever a request arrives, a working thread must be found to handle it. If there are no idle threads available, a new thread will be created.

Since our tasks take one hour to complete, a large number of tasks will create a large number of threads. We know that threads require a certain amount of memory allocation as thread stacks, such as 1MB. Therefore, creating threads without restrictions will inevitably lead to an OOM error:

public static ExecutorService newCachedThreadPool() {

    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());

In fact, most Java developers are aware of the characteristics of these two thread pools, but they may be overly confident and believe that using thread pools for lightweight tasks will not cause queue backlogs or create many threads.

However, reality is often harsh. I have encountered an incident in the past: after a user registers, we call an external service to send a text message. When the text message service is normal, it can respond within 100 milliseconds and there is a registration volume of 100 TPS. The CachedThreadPool can stably meet the demand with about 10 threads. At a certain point in time, the external text message service became unavailable, and the timeout for calling this service was particularly long, such as 1 minute. During this minute, 6000 users may have registered, creating 6000 tasks to send messages, which would require 6000 threads. It didn’t take long for an OOM error to occur due to the inability to create threads, causing the entire application to crash.

Therefore, I also do not recommend using the two convenient thread pool methods provided by Executors. The reasons are as follows:

We need to evaluate several core parameters of the thread pool based on our own scenarios and concurrency conditions, including the core thread count, maximum thread count, thread recycling strategy, type of work queue, and rejection strategy. This ensures that the behavior of the thread pool meets the requirements, and it usually requires setting a bounded work queue and controllable number of threads.

At any time, we should give a meaningful name to the custom thread pool to facilitate troubleshooting. When there is a sudden increase in thread count, thread deadlock, a large amount of CPU usage by threads, or exceptions occurring during thread execution, we often need to capture thread stacks. At this time, meaningful thread names can help us locate the problem.

In addition to recommending manual declaration of thread pools, I also suggest using monitoring methods to observe the status of the thread pool. Thread pools often work diligently and silently, and they usually do not throw exceptions unless a rejection strategy is encountered. If we can observe the queue backlog of the thread pool or the rapid increase of thread count in advance, we can often detect and resolve problems early.

Explanation of Thread Pool Thread Management Strategy #

In the previous demo, we implemented a rudimentary monitoring system using the printStats method to output basic internal information of the thread pool every second, including the number of threads, the number of active threads, the number of completed tasks, and the number of tasks in the queue, etc.:

private void printStats(ThreadPoolExecutor threadPool) {

   Executors.newSingleThreadScheduledExecutor().scheduleAtFixedRate(() -> {

        log.info("=========================");
        log.info("Pool Size: {}", threadPool.getPoolSize());
        log.info("Active Threads: {}", threadPool.getActiveCount());
        log.info("Number of Tasks Completed: {}", threadPool.getCompletedTaskCount());
        log.info("Number of Tasks in Queue: {}", threadPool.getQueue().size());
        log.info("=========================");

    }, 0, 1, TimeUnit.SECONDS);

}

Next, let’s use this method to observe the basic characteristics of the thread pool.

First, let’s create a custom thread pool. This thread pool has 2 core threads, 5 maximum threads, uses an ArrayBlockingQueue with a capacity of 10 as the work queue, and uses the default AbortPolicy rejection policy, which means that if adding a task to the thread pool fails, a RejectedExecutionException will be thrown. In addition, we use the ThreadFactoryBuilder method from the Jodd library to construct a thread factory to customize the naming of thread pool threads.

Then, let’s write a test code to observe the thread pool’s thread management strategy. The logic of the test code is to submit a task to the thread pool every 1 second, loop 20 times, and each task takes 10 seconds to complete. Here is the code:

@GetMapping("right")
public int right() throws InterruptedException {

    // Use a counter to track the number of completed tasks
    AtomicInteger atomicInteger = new AtomicInteger();

    // Create a thread pool with 2 core threads, 5 maximum threads, use an ArrayBlockingQueue with a capacity of 10 as the work queue, and use the default AbortPolicy rejection policy
    ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
            2, 5,
            5, TimeUnit.SECONDS,
            new ArrayBlockingQueue<>(10),
            new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get(),
            new ThreadPoolExecutor.AbortPolicy());

    printStats(threadPool);

    // Submit a task every second for a total of 20 times
    IntStream.rangeClosed(1, 20).forEach(i -> {

        try {
            TimeUnit.SECONDS.sleep(1);

        } catch (InterruptedException e) {
            e.printStackTrace();

        }

        int id = atomicInteger.incrementAndGet();

        try {
            threadPool.submit(() -> {
                log.info("{} started", id);

                // Each task takes 10 seconds
                try {
                    TimeUnit.SECONDS.sleep(10);
                } catch (InterruptedException e) {

                }
                log.info("{} finished", id);
            });

        } catch (Exception ex) {

            // If submission fails, print error message and decrement the counter
            log.error("error submitting task {}", id, ex);
            atomicInteger.decrementAndGet();

        }

    });

    TimeUnit.SECONDS.sleep(60);

    return atomicInteger.intValue();

}

After 60 seconds, the page outputs 17, indicating that 3 submissions failed:

img

And the log also shows 3 similar error messages:

[14:24:52.879] [http-nio-45678-exec-1] [ERROR] [.t.c.t.demo1.ThreadPoolOOMController:103 ] - error submitting task 18

java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@163a2dec rejected from java.util.concurrent.ThreadPoolExecutor@18061ad2[Running, pool size = 5, active threads = 5, queued tasks = 10, completed tasks = 2]

We can plot the logs printed by the printStats method into a chart and obtain the following curve:

img

From this, we can summarize the default behavior of the thread pool:

  • It does not initialize all the corePoolSize threads, but creates working threads only when tasks arrive.
  • When the core threads are full, it does not immediately expand the thread pool, but adds tasks to the work queue.
  • When the work queue is full, it expands the thread pool until the number of threads reaches maximumPoolSize.
  • If the queue is full and the maximum number of threads has been reached, and there are still tasks coming in, the rejection policy will be applied.
  • When the number of threads is greater than the core threads, if a thread is idle for keepAliveTime and there are no tasks to process, the thread will be shrunk to the core thread.

Understanding this strategy helps us set appropriate initial parameters for the thread pool based on actual capacity planning requirements. Of course, we can also change these default behaviors through other means, such as:

  • Immediately calling the prestartAllCoreThreads method after declaring the thread pool to start all core threads.
  • Passing true to the allowCoreThreadTimeOut method to allow the thread pool to also reclaim core threads when they are idle.

You may have wondered: the Java thread pool uses a work queue to store tasks that cannot be processed in time, and only expands the thread pool when the queue is full. When we set a large work queue, the maximumPoolSize parameter seems meaningless because the queue is difficult to fill, or it would be too late to expand the thread pool when it is full.

So, is there any way to make the thread pool more aggressive, prioritize opening more threads, and treat the queue as a backup plan? In our example, the tasks are slow and take 10 seconds to complete. If the thread pool can expand to the maximum 5 threads first, then these tasks can be completed, instead of being unable to handle slow tasks due to the thread pool expansion being too late.

Due to the space limitation, I can only give you a rough idea:

Since the thread pool expands when the work queue is full and unable to enqueue, can we override the offer method of the queue to create an illusion that the queue is full?

Since we have hacked the queue and the rejection policy will be triggered after reaching the maximum number of threads, can we implement a custom rejection policy handler at this time to actually insert the task into the queue?

Next, please try to implement such a “flexible” thread pool. The Tomcat thread pool also achieves a similar effect, which you can refer to.

Make sure to confirm whether the thread pool itself is reusable #

Not long ago, I encountered an incident: in a project’s production environment, there were occasional alarms indicating that the number of threads exceeded 2000. After receiving the alarm, I checked the monitoring and found that although the number of threads was high at times, it would drop down after a while. The thread count fluctuated severely, while the application’s traffic remained relatively stable.

To locate the problem, we captured the thread stack when the thread count was high. After capturing it, we found that there were over 1000 custom thread pools in memory. Normally, a thread pool should be reusable, and having between 1 and 5 thread pools can be considered normal. Having over 1000 thread pools is definitely not normal.

In the project’s code, we didn’t find any places where thread pools were declared. After searching for the keyword “execute”, we found that the business code called a library to get the thread pool. The business code was similar to the following: calling the getThreadPool method of ThreadPoolHelper to get the thread pool, and then submitting several tasks to the thread pool for processing. I couldn’t see anything unusual.

@GetMapping("wrong")
public String wrong() throws InterruptedException {

    ThreadPoolExecutor threadPool = ThreadPoolHelper.getThreadPool();

    IntStream.rangeClosed(1, 10).forEach(i -> {
        threadPool.execute(() -> {
            ...
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {

            }
        });
    });

    return "OK";
}

However, when we looked at the implementation of ThreadPoolHelper, we were shocked. The getThreadPool method actually used Executors.newCachedThreadPool() to create a thread pool every time.

class ThreadPoolHelper {

    public static ThreadPoolExecutor getThreadPool() {

        // The thread pool is not reusable
        return (ThreadPoolExecutor) Executors.newCachedThreadPool();

    }

}

Based on what we learned in the previous section, we can see that newCachedThreadPool creates the necessary number of threads when needed. Each business operation in the code submits multiple slow tasks to the thread pool, so each business operation will create multiple threads. If the concurrency of the business operations is high, it is indeed possible to create several thousand threads at once.

So, why can we see that the number of threads decreases in the monitoring and it doesn’t cause memory overflow?

Looking back at the definition of newCachedThreadPool, we can see that its core thread count is 0, and the keepAliveTime is 60 seconds, which means that all threads can be recycled after 60 seconds. Well, because of this feature, our business program doesn’t look too bad when it dies.

Fixing this bug is also straightforward. We can use a static field to store the reference of the thread pool, and the code that returns the thread pool can directly return this static field. Here, we must remember our best practice of manually creating the thread pool. The revised ThreadPoolHelper class is as follows:

class ThreadPoolHelper {

  private static ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor(
    10, 50,
    2, TimeUnit.SECONDS,
    new ArrayBlockingQueue<>(1000),
    new ThreadFactoryBuilder().setNameFormat("demo-threadpool-%d").get());

  public static ThreadPoolExecutor getRightThreadPool() {

    return threadPoolExecutor;
  }

}

Careful consideration of mixed use strategies for thread pools #

The purpose of a thread pool is to reuse threads. Does this mean that a program should always use a single thread pool?

Of course not. From what we learned in the first section, we need to specify the core parameters of a thread pool based on the “urgency” of the tasks, including the number of threads, the recycling strategy, and the task queue:

For IO tasks that are slow and not numerous, we may consider using more threads and do not need a large queue.

For computationally intensive tasks with a large throughput, the number of threads should not be excessive. It can be the number of CPU cores or twice the number of cores (the reason being, a thread will always be scheduled to execute on a specific CPU. If the task itself is CPU-bound, having too many threads will only increase the overhead of thread switching without improving throughput), but a longer queue may be needed for buffering.

I once encountered a problem where the business code used a thread pool to asynchronously process some data in memory. However, monitoring revealed that the processing was very slow. The entire process involved in-memory calculations and did not involve any IO operations. It took several seconds to complete, and the CPU usage of the application was not particularly high, which was a bit unbelievable.

After investigation, it was found that the thread pool used by the business code was also used by a background file batch processing task.

Perhaps the principle of using what is sufficient is applicable here. This thread pool had only 2 core threads and a maximum of 2 threads. It used an ArrayBlockingQueue with a capacity of 100 as the work queue and used the CallerRunsPolicy rejection policy:

private static ThreadPoolExecutor threadPool = new ThreadPoolExecutor(
        2, 2,
        1, TimeUnit.HOURS,
        new ArrayBlockingQueue<>(100),
        new ThreadFactoryBuilder().setNameFormat("batchfileprocess-threadpool-%d").get(),
        new ThreadPoolExecutor.CallerRunsPolicy());

Here, let’s simulate the code for batch file processing. After the program starts, it starts a thread with an infinite loop to continuously submit tasks to the thread pool. The logic of the task is to write a large amount of data to a file:

@PostConstruct
public void init() {

    printStats(threadPool);
    new Thread(() -> {

        //Simulate a large amount of data to be written
        String payload = IntStream.rangeClosed(1, 1_000_000)
                .mapToObj(__ -> "a")
                .collect(Collectors.joining(""));

        while (true) {
            threadPool.execute(() -> {
                try {

                    //Each time, the same data is created and written to the same file
                    Files.write(Paths.get("demo.txt"), Collections.singletonList(LocalTime.now().toString() + ":" + payload), UTF_8, CREATE, TRUNCATE_EXISTING);

                } catch (IOException e) {
                    e.printStackTrace();

                }
                log.info("batch file processing done");
            });
        }
    }).start();

}

It can be imagined that the two tasks in this thread pool are quite heavy. We can observe the burden of the thread pool through the logs printed by the printStats method:

img

It can be seen that the two threads in the thread pool are always active, and the queue is also almost full. Because the CallerRunsPolicy rejection handling policy is enabled, when the threads are fully loaded and the queue is full, the tasks will be executed in the thread that submits the task or calls the execute method. In other words, it cannot be assumed that tasks submitted to the thread pool will always be processed asynchronously. If the CallerRunsPolicy policy is used, it is possible that asynchronous tasks will become synchronous. This can also be seen from the fourth line of the log. This is the reason why this rejection policy is special.

I don’t know why the student who wrote the code set this policy. Perhaps they found that the thread pool encountered exceptions when it couldn’t handle the tasks, but they didn’t want the thread pool to discard the tasks, so they ultimately chose this rejection policy. Anyway, these logs are enough to indicate that the thread pool is saturated.

It can be imagined that if business code reuses such a thread pool for in-memory calculations, the fate must be miserable. Let’s write a piece of code to test it out. We submit a simple task to the thread pool. This task only sleeps for 10 milliseconds without any other logic:

private Callable<Integer> calcTask() {

    return () -> {
        TimeUnit.MILLISECONDS.sleep(10);
        return 1;
    };
}

@GetMapping("wrong")
public int wrong() throws ExecutionException, InterruptedException {

    return threadPool.submit(calcTask()).get();
}

Let’s use the wrk tool to perform a simple load test on this interface. We can see that the TPS is 75, and the performance is indeed very poor.

img

Let’s think about it, the problem is not that simple. Because the thread pool used to execute IO tasks uses the CallerRunsPolicy strategy, if we directly use this thread pool for asynchronous calculations, when the thread pool is saturated, the calculation tasks will be executed in the Tomcat thread that handles the web request. This will further affect other synchronous processing threads and may even cause the entire application to crash.

The solution is simple, use a separate thread pool to handle such “calculation tasks”. The term “calculation tasks” is in quotation marks because our simulation code only performs a sleep operation, which does not belong to CPU-bound operations but is more similar to IO-bound operations. If the number of thread pool threads is set too small, it will limit the throughput:

private static ThreadPoolExecutor asyncCalcThreadPool = new ThreadPoolExecutor(
  200, 200,
  1, TimeUnit.HOURS,
  new ArrayBlockingQueue<>(1000),
  new ThreadFactoryBuilder().setNameFormat("asynccalc-threadpool-%d").get());

@GetMapping("right")
public int right() throws ExecutionException, InterruptedException {
  return asyncCalcThreadPool.submit(calcTask()).get();
}

After modifying the code to use a separate thread pool and then testing the performance, the TPS has increased to 1727:

img

As you can see, the problem with blindly reusing thread pools and mixing threads is that the thread pool attributes defined by others may not be suitable for your tasks, and mixing them will interfere with each other. This is similar to how we often use virtualization technology to achieve resource isolation instead of having all applications directly use physical machines.

Regarding the problem of mixing thread pools, I would like to mention another pitfall: the parallel stream feature in Java 8. It allows us to easily process elements in a collection in parallel, with a shared ForkJoinPool behind-the-scenes. The default parallelism is CPU cores - 1. For CPU-bound tasks, this configuration is more appropriate, but if the collection operations involve synchronous IO operations (such as database operations, external service calls, etc.), it is recommended to customize a ForkJoinPool (or a regular thread pool). You can refer to the related demo in the first lecture.

Key Review #

A thread pool manages threads, which are valuable resources. Many performance issues in applications arise from improper configuration and use of thread pools. In today’s lesson, I shared with you three production incidents related to thread pools and several best practices for using them.

First, although the methods provided by the Executors class for declaring thread pools are simple, they hide the details of thread pool parameters. Therefore, when using a thread pool, we must properly configure the number of threads, task queue, rejection policy, thread recycling strategy based on the scenario and requirements, and give the threads clear names to facilitate troubleshooting.

Second, since we are using a thread pool, we need to ensure that the thread pool is being reused. Creating a new thread pool every time may be worse than not using a thread pool at all. If you are not directly declaring a thread pool but using a library provided by others to obtain one, be sure to check the source code to confirm that the instantiation and configuration of the thread pool are as expected.

Third, reusing a thread pool does not mean that the application always uses the same thread pool. We should choose different thread pools based on the nature of the tasks. Pay special attention to the preferences of thread pool attributes for IO-bound tasks and CPU-bound tasks. If you want to reduce interference between tasks, consider using isolated thread pools on demand.

Finally, I would like to emphasize that thread pools, as core components within applications, often lack monitoring (unless you are using MQ middleware like RabbitMQ, in which case the operations team generally takes care of middleware monitoring for us) and issues with thread pools are often only discovered after the program crashes, which is very passive. In the design section, we will revisit this issue and its solutions.

I’ve put all the code we used today on GitHub. You can click on this link to view it.

Reflection and Discussion #

In Section 1, we mentioned that perhaps a RAD (Rapid Application Development) thread pool that creates threads aggressively would better meet our needs. Can you provide relevant implementation? After implementation, please test whether all tasks can be completed correctly.

In Section 2, we improved ThreadPoolHelper to return a reusable thread pool. If we accidentally create this customized thread pool (with 10 core threads, 50 maximum threads, and a 2-second timeout for reclaiming) every time and repeatedly execute the test interface threads, will it eventually be reclaimed? Will an OOM (Out Of Memory) issue occur?

Have you encountered any other pitfalls related to thread pools? I am Zhu Ye. I welcome you to leave me a message in the comment section and share your thoughts. You are also welcome to share this article with your friends or colleagues for further discussion.