52 Can Semaphore Be Replaced by Fixed Thread Pool

52 Can Semaphore Be Replaced by FixedThreadPool #

In this lesson, we will introduce a utility class for controlling concurrent processes, which makes it easier for threads to cooperate with each other, such as allowing thread A to wait for thread B to complete before continuing execution to meet the business logic. In this lesson, we will start with Semaphore.

Semaphore #

Introduction #

img

From the figure, we can see that the main purpose of a semaphore is to control the access to resources that need to limit concurrent access. Specifically, the semaphore maintains a count of “permits”, and before a thread can access a shared resource, it must first acquire a permit. A thread can “acquire” a permit from the semaphore, and once the thread acquires it, the permit held by the semaphore is transferred to the thread, so the remaining permits held by the semaphore must be decremented.

Similarly, a thread can “release” a permit, and if a thread releases a permit, it is like returning the permit to the semaphore, so the number of available permits in the semaphore increases by one. When the number of permits held by the semaphore is reduced to 0, if a subsequent thread wants to acquire a permit, it must wait until the thread that obtained the permit releases it before it can acquire it. Since a thread cannot access the protected shared resource until it acquires a permit, this controls the concurrent access to the resource, which is the overall idea.

Application Examples, Use Cases #

Background

Let’s take a look at a specific scenario:

img

In this scenario, our service is the middle square, with requests on the left and the slow service we depend on on the right. For various reasons (such as large computation, multiple downstream dependencies, etc.), the right slow service is slow, and it can only handle a limited number of requests. If too many requests arrive at the same time, it may cause the service to be unavailable and crash it. So we must protect it and not allow too many threads to simultaneously access it. How can we achieve this?

Before explaining how to do this, let’s first see if we can achieve this with a regular thread pool in typical scenarios.

public class SemaphoreDemo1 {

    public static void main(String[] args) {

        ExecutorService service = Executors.newFixedThreadPool(50);

        for (int i = 0; i < 1000; i++) {

            service.submit(new Task());

        }

        service.shutdown();

    }

    static class Task implements Runnable {

        @Override

        public void run() {

            System.out.println(Thread.currentThread().getName() + " called the slow service");

            try {

                // Simulate slow service

                Thread.sleep(3000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

        }

    }

}

In this code, there is a thread pool with 50 fixed threads. We submit 1000 tasks to the thread pool, and each task executes by sleeping for 3 seconds to simulate the process of calling the slow service. When we run this program, we will see the following printed results:

pool-1-thread-2 called the slow service
pool-1-thread-4 called the slow service.

pool-1-thread-3 called the slow service.

pool-1-thread-1 called the slow service.

pool-1-thread-5 called the slow service.

pool-1-thread-6 called the slow service.

...

(Including threads pool-1-thread-1 to pool-1-thread-50)

It will invoke the slow service from thread 1 to thread 50. The actual invocation order may vary each time, but all 50 threads will almost simultaneously invoke the slow service. In this case, it will cause our slow service to crash.

Therefore, it is necessary to strictly limit the number of requests that can simultaneously reach the service. For example, if we want to limit the simultaneous access to the service to no more than 3 requests, how can we implement it? And here's one thing to note, our premise is that we do have 50 threads in the thread pool, which definitely exceeds 3 requests. However, how do we further control these many threads from accessing the slow service at the same time? We can solve this problem by using a semaphore.

**Obtaining permits under normal circumstances**

![img](../images/CgpOIF5fiXKAWCrGAABHA-Ygk4E065.png)

The boxes in this figure represent a semaphore with 3 permits. Each green bar represents a permit. Now we have 3 permits, and the semaphore is very "generous". As long as it holds permits, it will distribute them to others who want to request. Suppose Thread 1 comes to request at this time, in this case, the semaphore will give a permit to the first thread Thread 1. Thus, Thread 1 obtains the permit and becomes as follows:

![img](../images/CgpOIF5fiX-AIDnXAABVZqz1SKU970.png)

After Thread 1 obtains the permit, it becomes eligible to access the slow service. It will then proceed to access our slow service. At the same time, the semaphore's permits decrease to 2. Suppose this slow service is very slow and may not return for a long time, so Thread 1 will not release the permit before it returns. During this period, a second thread comes to request:

![img](../images/CgpOIF5fiY2ABkzRAABSnfx7sLg901.png)

Similarly, since the semaphore holds two permits, it can still meet the needs of Thread 2. Therefore, the second permit is given to the second thread. This way, the second thread also obtains our permit and can access the rightmost slow service, as shown in the figure:

![img](../images/CgpOIF5fiZmAe4JhAABU11rFmfE305.png)

Similarly, before the first two threads return, the third thread also comes and obtains the permit in the same way, and accesses the slow service:

![img](../images/Cgq2xl5fiaWADu9-AABcur4lvIo387.png)

When there is no permit, the requesting threads will be blocked

At this point, there are no permits left in the semaphore because the original three were given to these three threads. In this case, the semaphore can further fulfill its function. Assuming that the fourth thread comes to request a permit from our semaphore, since threads 1, 2, and 3 are still accessing the “slow service” and have not returned the permit yet, and the semaphore itself does not have any more permits, the following situation will occur:

img

When thread 4 requests a permit from us using the acquire method, it will be blocked, meaning that thread 4 did not get the permit and is not allowed to access the “slow service”. This means that the “slow service” can still only be accessed by the previous 3 threads, thus achieving our initial goal: limit the maximum simultaneous calls to our slow service to 3.

After a thread releases a permit

Assuming that thread 1 finishes its task first, it returns. When it returns, it calls the release method, indicating “I have finished my task and want to return the permit”. Therefore, thread 1 releases the permit it previously held and returns it to our semaphore. As a result, the number of permits held by the semaphore changes from 0 back to 1, as shown in the figure: img

Since the permit has been returned to the semaphore, the thread 4, which just requested the permit from us, can successfully obtain the permit that was just released. Now, thread 4 has the right to access the “slow service” and will proceed to access it.

However, it is important to note that thread 1 first returns the permit to the semaphore, and then the semaphore transfers the permit to thread 4. Therefore, at this time, there are still only 3 threads accessing the “slow service” simultaneously, namely threads 2, 3, and 4, because thread 1 has already completed its task and left.

img

If two threads release permits

Assuming the program continues to run, as time goes on, both thread 2 and thread 3 finish execution and release the permits they have. As a result, the semaphore once again has 2 permits, and it further issues these permits to thread 5 and thread 6, which still have this need. Now, these two threads can also access the “slow service”:

img

However, the threads currently accessing the “slow service” are now thread 4, thread 5, and thread 6. It can be seen that the total number does not exceed 3.

In this example, when thread 4 initially tried to acquire the permit, it was blocked. Even if thread 5, thread 6, or even thread 100 came to execute the acquire method, the semaphore would block them all. This demonstrates the main function of a semaphore in controlling concurrency.

Summary #

The above process demonstrates how to use a semaphore to control the execution of a certain task by a maximum of 3 threads at any given time. This is mainly achieved by controlling the issuance and return of permits.

Usage #

Usage Workflow #

After discussing the scenario, let’s take a look at the specific usage, which mainly consists of the following three steps. First, initialize a semaphore and pass in the number of permits. This is its constructor with a fairness parameter: public Semaphore(int permits, boolean fair). It takes two parameters, the first being the number of permits and the second being fairness. If the second parameter is set to true, it means it is a fair strategy. It will put previously waiting threads into a queue and when a new permit arrives, it will be distributed to the waiting threads in order. If the second parameter is set to false, it means it is a non-fair strategy. It may allow “cutting in line”, meaning threads that request later may get the permits first.

The second step is to use the acquire() method after establishing the constructor and initializing the semaphore. Before calling the slow service, the thread uses the acquire() or acquireUninterruptibly() method. These two methods aim to acquire permits, which also means that only if this method can successfully continue, it can proceed to access the code that calls the slow service. If there are no remaining permits at this time, the thread will wait at the line of code that uses the acquire() method, so it will not proceed to call the slow service. We use this method to protect our slow service.

The difference between acquire() and acquireUninterruptibly() is whether they respond to interrupts. acquire() can be interrupted, which means that if the thread is interrupted during acquiring the semaphore, it will exit the acquire() method and no longer try to acquire it. However, acquireUninterruptibly() will not be interrupted.

The third step is to call the release() method to release the permits after the task is completed. For example, after executing the line of code for the slow service, we can call the release() method to return the permit to our semaphore.

Introduction to other main methods #

In addition to these main methods, there are some other methods that I will introduce.

(1) public boolean tryAcquire()

tryAcquire() is consistent with the trylock thinking introduced for locks before. It attempts to acquire permits, which is equivalent to checking if there are any available permits now. If there are, it will acquire them. If it cannot acquire them at the moment, it is not necessary to be blocked, it can do other things.

(2) public boolean tryAcquire(long timeout, TimeUnit unit)

There is also an overloaded method that takes a timeout as a parameter. For example, if 3 seconds is passed in, it means it will wait for at most 3 seconds. If it acquires the permit during the waiting period, it will continue to execute; if the timeout is reached and it still cannot acquire the permit, it will consider the acquisition as a failure and return false.

(3) availablePermits()

This method is used to query the number of available permits and returns an integer result.

Sample Code #

Now let’s look at an example code:

public class SemaphoreDemo2 {

    static Semaphore semaphore = new Semaphore(3);

    public static void main(String[] args) {

        ExecutorService service = Executors.newFixedThreadPool(50);

        for (int i = 0; i < 1000; i++) {

            service.submit(new Task());

        }

        service.shutdown();

    }

    static class Task implements Runnable {

        @Override

        public void run() {

            try {

                semaphore.acquire();

            } catch (InterruptedException e) {

                e.printStackTrace();

            }

            System.out.println(Thread.currentThread().getName() + " acquired the permit and took 2 seconds to execute the slow service");

            try {

                Thread.sleep(2000);

            } catch (InterruptedException e) {

                e.printStackTrace();

            }
System.out.println("Slow service execution complete, " + Thread.currentThread().getName() + " released the permit");

semaphore.release();

}
}
}

In this code, we created a semaphore with a count of 3, and then created a fixed thread pool with 50 threads like before, putting 1000 tasks into it. Each task, before executing the simulated slow service, first uses the acquire() method of the semaphore to obtain a permit, and then proceeds to execute the 2-second slow service. Finally, the release() method is used to release the permit.

The result of the code execution is as follows:

pool-1-thread-1 acquired the permit and took 2 seconds to execute the slow service

pool-1-thread-2 acquired the permit and took 2 seconds to execute the slow service

pool-1-thread-3 acquired the permit and took 2 seconds to execute the slow service

Slow service execution complete, pool-1-thread-1 released the permit

Slow service execution complete, pool-1-thread-2 released the permit

Slow service execution complete, pool-1-thread-3 released the permit

pool-1-thread-4 acquired the permit and took 2 seconds to execute the slow service

pool-1-thread-5 acquired the permit and took 2 seconds to execute the slow service

pool-1-thread-6 acquired the permit and took 2 seconds to execute the slow service

Slow service execution complete, pool-1-thread-4 released the permit

Slow service execution complete, pool-1-thread-5 released the permit

Slow service execution complete, pool-1-thread-6 released the permit

It first allows threads 1, 2, and 3 to acquire the permit and then they proceed to execute the 2-second slow service. Only when they are finished executing and release the permit can the following threads acquire the permit and execute the service. When the first 3 threads haven’t finished executing, which means they haven’t released the permit yet, the following threads have already requested the permit and will be blocked. From the execution result, we can see that only 3 threads at most can access our slow service at the same time.

Special Usage: Obtaining or Releasing Multiple Permits at Once #

Let’s introduce a special usage of the semaphore. It can obtain or release multiple permits at once.

For example, semaphore.acquire(2), passing 2 as the parameter, means acquiring two permits at once. The same applies to releasing, semaphore.release(3) is equivalent to releasing three permits at once.

Why would we need to do this? Let’s illustrate with a usage scenario. Suppose the first task A calls a resource-intensive method, method1(), and task B calls a less resource-intensive method, method2(). In this case, assuming we have 5 permits in total, we can only allow 1 thread to call method1 at a time, or allow at most 5 threads to call method2 at the same time. But method1 and method2 cannot be called simultaneously.

Therefore, we require Task A to acquire all 5 permits at once before execution, while Task B only needs to acquire one permit. This way, we prevent Tasks A and B from running concurrently, while also considering efficiency. Otherwise, if we only allow one thread to access method2 at a time, it may waste resources. So this allows us to allocate resources based on our needs using the permits of the semaphore.

Notes #

There are a few things to note about semaphores:

  • Try to keep the number of acquired and released permits consistent. Otherwise, for example, if we always acquire 2 permits but only release 1 or even none, the permits in the semaphore will be gradually consumed until there are no permits left, preventing other threads from accessing it.
  • Fairness can be set during initialization. If set to true, it will make it fairer, but if set to false, it will increase the overall throughput.
  • Semaphores support cross-thread and cross-thread pool scenarios. It is not required that the thread that acquires a permit must be the one to release it. For example, it is perfectly valid for Thread A to acquire the permit and then for Thread B to release it, as long as the logic is reasonable.

Can a Semaphore be Replaced by a FixedThreadPool? #

Let’s go back to the question of whether a semaphore can be replaced by a FixedThreadPool. In other words, if a semaphore can limit the number of concurrently accessing threads, why not simply use a fixed number of threads in the thread pool to accomplish the same goal? This would seem more convenient. For example, if there are 3 threads in the thread pool, naturally there can only be 3 threads accessing it.

This is a good question. In practical scenarios, we encounter situations where, for example, before calling the slow service, we need a condition to determine whether to limit the maximum number of threads to access it. For instance, we may want to limit it to 3 threads only near midnight (e.g., zero o’clock), while during most other times, we want more threads to be able to access it. In this case, we should set the number of threads in the thread pool to 50 or even more, and then add an if condition before execution. If it meets the time restriction (e.g., near midnight), then use the semaphore to further limit it. This approach is more reasonable.

Let’s consider another example. In a large-scale application, there may be different types of tasks that call the slow service using different thread pools. Since there is not only one caller, but possibly multiple callers such as Tomcat servers or gateways, we should not limit their thread pool size, or it may not be possible to limit it. However, what we can do is to limit the number of requests that can access the service before execution using the semaphore, because our semaphore supports cross-thread and cross-thread pool scenarios. If we use a FixedThreadPool to limit it, then we cannot limit it across thread pools, greatly weakening its functionality.

Based on the reasons mentioned above, if we want to limit the number of concurrent accessing threads, using a semaphore is more appropriate.