18 Principles of Thread Reuse in Thread Pool Implementation

18 Principles of Thread Reuse in ThreadPool Implementation #

In this lesson, we mainly study the principle of thread reuse and analyze the source code of the important execute method in thread pools.

Principle of thread reuse #

We know that a thread pool uses a fixed number or a variable number of threads to execute tasks. However, whether it is a fixed number or a variable number of threads, the number of threads is much smaller than the number of tasks. In this case, a thread pool can achieve thread reuse by letting the same thread execute different tasks. So what is the principle behind thread reuse?

The thread pool decouples threads from tasks, assigning threads to threads and tasks to tasks, breaking the previous restriction where one thread created by Thread must correspond to one task. In a thread pool, the same thread can continuously extract new tasks from the BlockingQueue to be executed. The core principle is that the thread pool encapsulates Thread, and it does not create a new thread by calling Thread.start() every time a task is executed. Instead, it makes each thread execute a “looping task”. In this “looping task”, it continuously checks if there are tasks waiting to be executed. If there are, it directly executes the task by calling its run method, treating the run method as an ordinary method call, effectively connecting the run() methods of each task together, so the number of threads does not increase.

First, let’s review the rules and timing for creating new threads in a thread pool:

img

As shown in the flowchart, after a task is submitted, the thread pool first checks the current number of threads. If the current number of threads is less than the core thread pool size (e.g., initially, the thread count is 0), a new thread is created and the task is executed. As tasks continue to increase, the thread count gradually increases until it reaches the core pool size. If there are still tasks being submitted, they will be put into the workQueue task queue and wait to be executed when the core threads finish executing the current task. At this time, if we assume that there are too many tasks and the workQueue reaches its capacity limit, the thread pool will activate the backup option, which is the maximum pool size. The thread pool continues to create threads based on the core pool size to execute tasks. If tasks are still being submitted and the thread count reaches the maximum pool size, it means that the thread pool has reached its maximum processing capacity. In this case, the thread pool will reject these tasks. As we can see, after the tasks come in, the thread pool sequentially checks the core pool size, the work queue, and the maximum pool size. If the requirements are still not met, it will reject the tasks.

Next, let’s take a closer look at the code implementation. We start by analyzing the execute method, and the source code is as follows:

public void execute(Runnable command) { 

    if (command == null) 

        throw new NullPointerException();

    int c = ctl.get();

    if (workerCountOf(c) < corePoolSize) { 

        if (addWorker(command, true)) 

            return;

        c = ctl.get();

    } 

    if (isRunning(c) && workQueue.offer(command)) { 

        int recheck = ctl.get();

        if (! isRunning(recheck) && remove(command)) 

            reject(command);

        else if (workerCountOf(recheck) == 0) 

            addWorker(null, false);

    } 

    else if (!addWorker(command, false)) 

        reject(command);

}

Source code analysis of thread reuse #

This code is brief yet informative. Let’s analyze the logic in the code. First, let’s look at the first few lines:

// If the passed-in Runnable is null, throw an exception
if (command == null) 
    throw new NullPointerException();

In the execute method, the if statement checks whether the command, which is the Runnable task, is null. If it is null, an exception is thrown.

Next, it checks whether the current number of threads is less than the core pool size. If it is, it calls the addWorker method to add a Worker. Here, Worker can be understood as a thread:

if (workerCountOf(c) < corePoolSize) { 

    if (addWorker(command, true)) 

        return;

    c = ctl.get();

Note: The original code is in Java. } }

So what is the addWorker method used for? The main purpose of the addWorker method is to create a thread in the thread pool and execute the task passed as the first parameter. The second parameter is a boolean value. If the boolean value is passed as true, it means that when adding a thread, it checks whether the current number of threads is less than the corePoolSize. If it is less, a new thread is added; if it is greater than or equal to the corePoolSize, no new thread is added. Similarly, if the boolean value is passed as false, it means that when adding a thread, it checks whether the current number of threads is less than the maxPoolSize. If it is less, a new thread is added; if it is greater than or equal to the maxPoolSize, no new thread is added. So the meaning of this boolean value is whether the judgment for adding a thread is based on the corePoolSize or the maxPoolSize. If the addWorker() method returns true, it means the addition is successful; if it returns false, it means the addition failed.

Let’s take a look at the next part of the code:

if (isRunning(c) && workQueue.offer(command)) {

    int recheck = ctl.get();

    if (! isRunning(recheck) && remove(command)) 

        reject(command);

    else if (workerCountOf(recheck) == 0) 

        addWorker(null, false);

}

If the code reaches this point, it means that the current number of threads is greater than or equal to the corePoolSize, or the addWorker method failed. Therefore, it needs to check the state of the thread pool with if (isRunning(c) && workQueue.offer(command)). If the thread pool is in the Running state, then the task is added to the task queue using workQueue.offer(command). If the thread pool is no longer in the Running state, it means that the thread pool has been shut down. In this case, the task that was just added to the task queue is removed and the rejection policy is executed, as shown in the following code:

if (! isRunning(recheck) && remove(command)) 

    reject(command);

Now let’s look at the last else branch:

else if (workerCountOf(recheck) == 0) 

    addWorker(null, false);

If it reaches this else branch, it means the previous check found that the thread pool is in the Running state. So after a task is added, it needs to prevent the situation where there are no available threads to execute the task (such as when the previous threads have been reclaimed or terminated unexpectedly). Therefore, if the current number of threads is 0, which means workerCountOf(recheck) == 0, a new thread is created by calling the addWorker() method.

Now let’s look at the last part of the code:

else if (!addWorker(command, false)) 

    reject(command);

If it reaches this part, it means the thread pool is not in the Running state, or the number of threads is greater than or equal to the corePoolSize and the task queue is full. According to the rules, new threads need to be added until the number of threads reaches the “maximum thread size”. Therefore, the addWorker method is called again with the second parameter set to false, which means it checks whether the current number of threads is less than the maxPoolSize. If it is less, a new thread is added; if it is greater than or equal to the maxPoolSize, no new thread is added. In other words, new workers are created based on maxPoolSize. If the addWorker method returns true, it means the addition was successful. If it returns false, it means the current number of threads has reached maxPoolSize, and the rejection policy is executed.

As we can see, in the execute method, the addWorker method is called multiple times to pass the task. The addWorker method adds and starts a Worker, which can be understood as a wrapper for Thread. The Worker class contains a Thread object internally, which is the actual thread that executes the task. So a Worker corresponds to a thread in the thread pool, and addWorker represents adding a thread. The logic for thread reuse is mainly implemented in the runWorker method within the Worker class. The simplified code of the runWorker method is as follows:

runWorker(Worker w) {

    Runnable task = w.firstTask;

    while (task != null || (task = getTask()) != null) {

        try {

            task.run();

        } finally {

            task = null;

        }

    }

}

As we can see, the logic for thread reuse is mainly implemented in a while loop that keeps looping.

  1. It either takes the firstTask of the Worker or gets the task from the workQueue using the getTask method.
  2. It directly calls the run method of the task to execute the specific task (instead of creating a new thread).

Here, we have found the final implementation. It retrieves a new task from the firstTask of the Worker or the getTask method from the workQueue, and directly calls the run method of the Runnable to execute the task. This means that as mentioned before, each thread is always in a big loop, repeatedly getting tasks and executing them, thus achieving thread reuse.