19 Which Concurrent Utilities Are Provided by the Java Concurrency Package

19 Which concurrent utilities are provided by the Java concurrency package #

Through our previous study, we have reviewed various basic elements of concurrent programming, such as threads and locks, and gradually touched upon some aspects of the Java Concurrency Package. I believe that after this warm-up, we will be able to understand the Java Concurrency Package more quickly.

Today, I want to ask you a question: What concurrent utility classes does the Java Concurrency Package provide?

Typical Answer #

What we usually refer to as the concurrent package is the java.util.concurrent package and its sub-packages, which gather various foundational classes for Java concurrency. Specifically, it mainly includes the following aspects:

  • It provides various synchronization structures that are more advanced than synchronized, including CountDownLatch, CyclicBarrier, Semaphore, etc. These structures allow for more sophisticated multi-threaded operations. For example, we can use Semaphore as a resource controller to limit the number of threads working simultaneously.
  • It offers various thread-safe containers, such as the commonly used ConcurrentHashMap, the ordered ConcurrentSkipListMap, or the thread-safe dynamic array CopyOnWriteArrayList implemented through mechanisms like snapshotting.
  • It includes various implementations of concurrent queues, such as different BlockingQueue implementations, including the typical ones like ArrayBlockingQueue, SynchronousQueue, or PriorityBlockingQueue for specific scenarios.
  • It provides a powerful Executor framework, which enables the creation of different types of thread pools, task scheduling, etc. In most cases, we no longer need to implement thread pools and task schedulers from scratch.

Test Point Analysis #

This question mainly tests your understanding of the concurrency package and whether you have practical experience using it. When we do multithreaded programming, we mainly have several goals:

  • Use multiple threads to improve the scalability of the program to meet the throughput requirements of the business.
  • Coordinate thread scheduling and interaction to complete the business logic.
  • Pass data and state between threads, which is also necessary for implementing business logic.

Therefore, this question can only be considered as a simple start. The interviewer will often further test how to use the concurrency package to implement a specific use case and analyze the advantages and disadvantages of the implementation.

If you are weak in this area, my advice is:

  • At a high level, grasp the main components (briefly introduced in the previous answer).
  • Understand specific design, implementation, and capabilities.
  • Deeply understand the applicable scenarios, usage, and even principles of some typical utility classes, and be able to write typical code examples proficiently.

Mastering these will usually be enough. After all, the concurrency package provides various tools, and there are few opportunities to use all of them comprehensively in an application. It is already very good to have a solid grasp of the core features. Truly deep experience can only be gained by encountering problems in practical scenarios.

Knowledge Extension #

First, let’s take a look at the rich synchronization structures provided by the concurrency package. In previous lectures, we have analyzed various explicit locks. Today, I will focus on:

  • CountDownLatch, which allows one or more threads to wait for certain operations to complete.
  • CyclicBarrier, an auxiliary synchronization structure that allows multiple threads to wait until a barrier is reached.
  • Semaphore, the Java version of a semaphore implementation.

Java provides an implementation of the classic semaphore (Semaphore), which restricts access to shared resources by controlling the number of permits allowed. You can imagine a scenario where, at a taxi stand or airport, when many empty taxis are available, a dispatcher commands a queue of people waiting for a ride to allow 5 people to get on at a time. This is similar to how a semaphore works.

You can try using a Semaphore to simulate this scheduling process:

import java.util.concurrent.Semaphore;
public class UsualSemaphoreSample {
  public static void main(String[] args) throws InterruptedException {
      System.out.println("Action...GO!");
      Semaphore semaphore = new Semaphore(5);
      for (int i = 0; i < 10; i++) {
          Thread t = new Thread(new SemaphoreWorker(semaphore));
          t.start();
      }
  }
}
class SemaphoreWorker implements Runnable {
  private String name;
  private Semaphore semaphore;
  public SemaphoreWorker(Semaphore semaphore) {
      this.semaphore = semaphore;
  }
  @Override
  public void run() {
      try {
          log("is waiting for a permit!");
         semaphore.acquire();
          log("acquired a permit!");
          log("executed!");
      } catch (InterruptedException e) {
          e.printStackTrace();
      } finally {
          log("released a permit!");
          semaphore.release();
      }
  }
  private void log(String msg){
      if (name == null) {
          name = Thread.currentThread().getName();
      }
      System.out.println(name + " " + msg);
  }
}

This code is a typical Semaphore example. The logic is that a thread attempts to obtain a work permit, and if granted, it performs the task and then releases the permit. Other threads waiting for the permit can then proceed to work until all tasks are completed. When you compile and run it, you can see how the permit mechanism of Semaphore limits the working threads.

However, the specific rhythm of this example does not quite match the requirements of our scenario. In this example, the usage of Semaphore is to ensure that there are always 5 people who can try to take a taxi. If one person departs, there will immediately be someone from the queue granted the permit, which does not completely match our previous requirements.

So let me modify it to demonstrate a non-typical usage of Semaphore:

import java.util.concurrent.Semaphore;
public class AbnormalSemaphoreSample {
  public static void main(String[] args) throws InterruptedException {
      Semaphore semaphore = new Semaphore(0);
      for (int i = 0; i < 10; i++) {
          Thread t = new Thread(new MyWorker(semaphore));
          t.start();
      }
      System.out.println("Action...GO!");
      semaphore.release(5);
      System.out.println("Wait for permits off");
      while (semaphore.availablePermits()!=0) {
          Thread.sleep(100L);
      }
      System.out.println("Action...GO again!");
      semaphore.release(5);
  }
}
class MyWorker implements Runnable {
  private Semaphore semaphore;
  public MyWorker(Semaphore semaphore) {
      this.semaphore = semaphore;
  }
  @Override
  public void run() {
      try {
          semaphore.acquire();
          System.out.println("Executed!");
      } catch (InterruptedException e) {
          e.printStackTrace();
      }
  }
}

Note that the above code focuses more on demonstrating the functionality and limitations of Semaphore. There are many anti-patterns in thread programming, such as using sleep to coordinate task execution, and using polling to check the acquisition of permits. These are inefficient and fragile, usually used only for testing or diagnostic purposes.

In general, we can see that Semaphore is just a counter, and its basic logic is based on acquire/release, without too much complex synchronization logic.

If the value of Semaphore is initialized to 1, then a thread can enter a mutex state by using acquire, which is essentially very similar to a mutex lock. However, there is also a clear difference. For example, a mutex lock has an owner, while for a counter structure like Semaphore, although it has similar functionality, there is no true owner unless we extend and wrap it.

Next, let’s look at CountDownLatch and CyclicBarrier. They have some similarities in behavior and are often examined for their differences. Let me summarize them briefly:

  • CountDownLatch cannot be reset, so it cannot be reused, while CyclicBarrier does not have this limitation and can be reused.
  • The basic operation combination of CountDownLatch is countDown/await. The thread calling await blocks until countDown is called enough times. It doesn’t matter whether you count down in one or multiple threads, as long as the count is sufficient. So, as Brian Goetz said, CountDownLatch operates on events.
  • The basic operation combination of CyclicBarrier is await. It continues with the task only when all parties have called await, and it automatically resets. Note that under normal circumstances, the reset of CyclicBarrier occurs automatically. If we call the reset method while there are still threads waiting, the waiting threads will be disturbed, and a BrokenBarrierException will be thrown. CyclicBarrier focuses on threads, not calling events. Its typical use case is to wait for concurrent threads to finish.

If we want to use CountDownLatch to implement the queuing scenario mentioned above, how should we do it? Let’s assume there are 10 people in the queue, and we divide them into batches of 5 people each, using CountDownLatch to coordinate the batches. You can try the following example code:

import java.util.concurrent.CountDownLatch;
public class LatchSample {
  public static void main(String[] args) throws InterruptedException {
      CountDownLatch latch = new CountDownLatch(6);
           for (int i = 0; i < 5; i++) {
                Thread t = new Thread(new FirstBatchWorker(latch));
                t.start();
      }
      for (int i = 0; i < 5; i++) {
              Thread t = new Thread(new SecondBatchWorker(latch));
              t.start();
}
}
     // Note that this is a demonstration logic and not a recommended coordination method
while (latch.getCount() != 1) {
    Thread.sleep(100L);
}
System.out.println("Wait for first batch finish");
latch.countDown();
}
}
class FirstBatchWorker implements Runnable {
private CountDownLatch latch;
public FirstBatchWorker(CountDownLatch latch) {
    this.latch = latch;
}
@Override
public void run() {
    System.out.println("First batch executed!");
    latch.countDown();
}
}
class SecondBatchWorker implements Runnable {
private CountDownLatch latch;
public SecondBatchWorker(CountDownLatch latch) {
    this.latch = latch;
}
@Override
public void run() {
    try {
        latch.await();
        System.out.println("Second batch executed!");
    } catch (InterruptedException e) {
        e.printStackTrace();
    }
}
}


The scheduling of CountDownLatch is relatively simple. The threads in the later batch await, waiting for the previous batch to count down enough times. This example also shows its limitations. Although it can support a queue of 10 people, it cannot be relied upon to support more people queuing by using a CountDownLatch. The compilation and execution output is as follows:

[](../images/46c88c7d8e0507465bddb677e4eac5b9-20221031211303-0y18ajn.png)

In actual applications, the condition dependencies are often not so awkward. CountDownLatch is a very simple and common usage for coordinating thread wait for operations to finish. The combination of countDown/await is very efficient, and it is generally not recommended to use the loop waiting method in the example.

What if we express this scenario using CyclicBarrier? We know that CyclicBarrier actually reflects the coordination of threads running in parallel. In the following example, from a logical point of view, the 5 worker threads actually represent 5 ready-to-use empty cars, rather than 5 passengers. Comparing with the previous example of CountDownLatch, it helps us to distinguish their abstract models. Please see the example code below:

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
public class CyclicBarrierSample {
public static void main(String[] args) throws InterruptedException {
    CyclicBarrier barrier = new CyclicBarrier(5, new Runnable() {
        @Override
        public void run() {
            System.out.println("Action...GO again!");
        }
    });
    for (int i = 0; i < 5; i++) {
        Thread t = new Thread(new CyclicWorker(barrier));
        t.start();
    }
}
static class CyclicWorker implements Runnable {
    private CyclicBarrier barrier;
    public CyclicWorker(CyclicBarrier barrier) {
        this.barrier = barrier;
    }
    @Override
    public void run() {
        try {
            for (int i=0; i<3 ; i++){
                System.out.println("Executed!");
                barrier.await();
            }
        } catch (BrokenBarrierException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}
}

In order to make the output express the runtime sequence more, I used the barrierAction specific to CyclicBarrier. When the barrier is triggered, Java automatically schedules the action. Because CyclicBarrier will automatically reset, this logic can naturally support more queue personnel. The compiled output is as follows:

![](../images/eff56d3219ce5493ecacc70a168b2b9f-20221031211303-kpidh6x.png)

The Java concurrent package also provides thread-safe Map, List, and Set. First, please refer to the following class diagram.

![](../images/35390aa8a6e6f9c92fda086a1b95b457-20221031211303-hw8ex54.png)

As you can see, the overall types and structures are relatively simple. If our application focuses on the speed of putting or getting from the Map and doesn't care about the order, it is mostly recommended to use ConcurrentHashMap. Otherwise, use ConcurrentSkipListMap. If we need to modify a large amount of data frequently, ConcurrentSkipListMap may also perform better.

In the previous column, I mentioned that for scenarios where there is no order, HashMap is selected, and for scenarios with order, TreeMap-like data structures are recommended. But why is there no ConcurrentTreeMap in the concurrent container?

This is because implementing efficient thread safety for TreeMap is very difficult. Its implementation is based on a complex red-black tree. To ensure access efficiency, when we insert or delete nodes, moving nodes to balance the tree operation is needed. This makes it difficult to synchronize with reasonable granularity in concurrent scenarios. On the other hand, the SkipList structure is relatively simpler and faster to insert and delete elements, so the overhead of thread safety is much lower. To help you understand the internal structure of SkipList, I have created an illustration.

![](../images/63b94b5b1d002bb191c75d2c48af767b-20221031211303-9mz4ih6.png)

Regarding the two CopyOnWrite containers, CopyOnWriteArraySet is actually implemented by encapsulating CopyOnWriteArrayList. So when learning, we can focus on understanding one of them.

First, what does CopyOnWrite mean? Its principle is that any modification operation, such as add, set, and remove, will copy the original array, modify it, and then replace the original array. Through this defensive mechanism, an alternative thread safety is achieved. Please see the code snippet below. The commented parts explain its logic clearly.

public boolean add(E e) {
synchronized (lock) {
    Object[] elements = getArray();
    int len = elements.length;
           // Copy
    Object[] newElements = Arrays.copyOf(elements, len + 1);
    newElements[len] = e;
           // Replace
    setArray(newElements);
    return true;
        }
}
final void setArray(Object[] a) {
array = a;
}

So this data structure is relatively suitable for operations with multiple reads and few writes. Otherwise, the overhead of modifications will be significant.

Today, I have summarized the Java concurrent package and analyzed various synchronization structures and some thread-safe containers with examples. I hope it will be helpful to you.

Practice Question #

Have you understood the topic we discussed today? The question for your consideration is: have you ever used a synchronization construct similar to CountDownLatch to solve real-world problems? Share your use cases and insights.

Please write your thoughts on this question in the comments section. I will select carefully considered comments and give you a learning reward voucher. Join me in the discussion!

Are your friends also preparing for interviews? You can “invite friends to read” and share today’s question with them. Perhaps you can help them.