28 How to Use Design Patterns to Optimize Concurrent Programming

28 How to use design patterns to optimize concurrent programming #

Hello, I’m Liu Chao.

When we are programming with multiple threads, many times we need to design a set of business functions based on the business scenario. In fact, there are many mature functional design patterns in multithreaded programming. By learning and utilizing them well, it will be a great help. Today, I will introduce you to several commonly used design patterns in concurrent programming.

Thread Context Design Pattern #

Thread context refers to some global information in objects that spans the entire lifecycle of a thread. For example, in Spring, we are familiar with the ApplicationContext class, which is a class related to context. It stores context information such as configuration, user information, and registered beans throughout the system’s lifecycle.

This explanation may seem abstract, so let’s take a concrete example to see when we actually need context.

When executing a long request task, this request may go through many layers of method calls. Suppose we need to pass an intermediate result from the initial method to the end method for calculation. A simple implementation is to add this intermediate result as a parameter in each function and pass it down one by one. The code is as follows:

public class ContextTest {
 
	// Context class
	public class Context {
		private String name;
		private long id
 
		public long getId() {
			return id;
		}
 
		public void setId(long id) {
			this.id = id;
		}
 
		public String getName() {
			return this.name;
		}
 
		public void setName(String name) {
			this.name = name;
		}
	}
 
	// Set the context name
	public class QueryNameAction {
		public void execute(Context context) {
			try {
				Thread.sleep(1000L);
				String name = Thread.currentThread().getName();
				context.setName(name);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
 
	// Set the context ID
	public class QueryIdAction {
		public void execute(Context context) {
			try {
				Thread.sleep(1000L);
				long id = Thread.currentThread().getId();
				context.setId(id);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
 
	// Execute method
	public class ExecutionTask implements Runnable {
 
		private QueryNameAction queryNameAction = new QueryNameAction();
		private QueryIdAction queryIdAction = new QueryIdAction();
 
		@Override
		public void run() {
			final Context context = new Context();
			queryNameAction.execute(context);
			System.out.println("The name query successful");
			queryIdAction.execute(context);
			System.out.println("The id query successful");
 
			System.out.println("The Name is " + context.getName() + " and id " + context.getId());
		}
	}
 
	public static void main(String[] args) {
		IntStream.range(1, 5).forEach(i -> new Thread(new ContextTest().new ExecutionTask()).start());
	}
}

Result of execution:

The name query successful
The name query successful
The name query successful
The name query successful
The id query successful
The id query successful
The id query successful
The id query successful
The Name is Thread-1 and id 11
The Name is Thread-2 and id 12
The Name is Thread-3 and id 13
The Name is Thread-0 and id 10

However, this approach is too clumsy. Every time a method is called, the Context needs to be passed as a parameter, which also affects the encapsulation of some intermediate common methods.

Can we set a global variable? In the case of multithreading, we need to consider thread safety, and this involves lock contention.

In addition to the methods mentioned above, we can actually use ThreadLocal to implement context. ThreadLocal is a thread-local variable that can achieve data isolation for multithreading. ThreadLocal provides a separate copy of the variable for each thread that uses it, and the data between threads is isolated, so each thread can only access its own copy of the variable.

There are three commonly used methods in ThreadLocal: set, get, and initialValue. We can see how ThreadLocal is used through a simple example:

private void testThreadLocal() {
    Thread t = new Thread() {
        ThreadLocal<String> mStringThreadLocal = new ThreadLocal<String>();
 
        @Override
        public void run() {
            super.run();
            mStringThreadLocal.set("test");
            mStringThreadLocal.get();
        }
    };
 
    t.start();
}

Next, let’s use ThreadLocal to redesign the initial context. You will find that we don’t pass the context through variables in the two methods, but use ThreadLocal to get the context information of the current thread:

public class ContextTest {
	// Context class
	public static class Context {
		private String name;
		private long id;
 
		public long getId() {
			return id;
		}
 
		public void setId(long id) {
			this.id = id;
		}
 
		public String getName() {
			return this.name;
		}
 
		public void setName(String name) {
			this.name = name;
		}
	}
 
	// Copy the context to ThreadLocal
	public final static class ActionContext {
 
		// Use ThreadLocal with an overridden initialValue method to create a new Context object for each thread
		private static final ThreadLocal<Context> threadLocal = new ThreadLocal<Context>() {
			@Override
			protected Context initialValue() {
				return new Context();
			}
		};
 
		public static ActionContext getActionContext() {
			return ContextHolder.actionContext;
		}
 
		public Context getContext() {
			return threadLocal.get();
		}
 
		// Get singleton of ActionContext
		public static class ContextHolder {
			private final static ActionContext actionContext = new ActionContext();
		}
	}
 
	// Set the context name
	public class QueryNameAction {
		public void execute() {
			try {
				Thread.sleep(1000L);
				String name = Thread.currentThread().getName();
				ActionContext.getActionContext().getContext().setName(name);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
 
	// Set the context ID
	public class QueryIdAction {
		public void execute() {
			try {
				Thread.sleep(1000L);
				long id = Thread.currentThread().getId();
				ActionContext.getActionContext().getContext().setId(id);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
 
	// Execute method
	public class ExecutionTask implements Runnable {
		private QueryNameAction queryNameAction = new QueryNameAction();
		private QueryIdAction queryIdAction = new QueryIdAction();
 
		@Override
		public void run() {
			queryNameAction.execute();// Set the thread name
			System.out.println("The name query successful");
			queryIdAction.execute();// Set the thread ID
			System.out.println("The id query successful");
 
			System.out.println("The Name is " + ActionContext.getActionContext().getContext().getName() + " and id " + ActionContext.getActionContext().getContext().getId())
		}
	}
 
	public static void main(String[] args) {
		IntStream.range(1, 5).forEach(i -> new Thread(new ContextTest().new ExecutionTask()).start());
	}
}

Execution result:

The name query successful
The name query successful
The name query successful
The name query successful
The id query successful
The id query successful
The id query successful
The id query successful
The Name is Thread-2 and id 12
The Name is Thread-0 and id 10
The Name is Thread-1 and id 11
The Name is Thread-3 and id 13

Thread-Per-Message Design Pattern #

The translation of the Thread-Per-Message design pattern means “one thread per message”. For example, when we are dealing with Socket communication, we usually have one thread handling event listening and I/O read/write operations. If the I/O operations are time-consuming, it will affect the event listening and processing.

In this case, the Thread-Per-Message pattern can solve this problem well. One thread listens for I/O events, and whenever an I/O event is detected, it is handed over to another processing thread to execute the I/O operation. Let’s learn how this design pattern is implemented through an example.

// I/O processing
public class ServerHandler implements Runnable {
    private Socket socket;

    public ServerHandler(Socket socket) {
        this.socket = socket;
    }

    public void run() {
        BufferedReader in = null;
        PrintWriter out = null;
        String msg = null;
        try {
            in = new BufferedReader(new InputStreamReader(socket.getInputStream()));
            out = new PrintWriter(socket.getOutputStream(), true);
            while ((msg = in.readLine()) != null && msg.length() != 0) {
                // Wait here to receive messages after the connection is successful (suspended, enter a blocking state)
                System.out.println("Server received: " + msg);
                out.print("received~\n");
                out.flush();
            }
        } catch (Exception e) {
            e.printStackTrace();
        } finally {
            try {
                in.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
            try {
                out.close();
            } catch (Exception e) {
                e.printStackTrace();
            }
            try {
                socket.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }
}

// Socket server startup
public class Server {
    private static int DEFAULT_PORT = 12345;
    private static ServerSocket server;

    public static void start() throws IOException {
        start(DEFAULT_PORT);
    }

    public static void start(int port) throws IOException {
        if (server != null) {
            return;
        }

        try {
            // Start the server
            server = new ServerSocket(port);
            // Continuously listen for client connections
            while (true) {
                Socket socket = server.accept();
                // When a new client is connected, the following code is executed
                long start = System.currentTimeMillis();
                new Thread(new ServerHandler(socket)).start();

                long end = System.currentTimeMillis();
                System.out.println("Spend time is " + (end - start));
            }
        } finally {
            if (server != null) {
                System.out.println("Server has been closed.");
                server.close();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        // Run the server
        new Thread(new Runnable() {
            public void run() {
                try {
                    Server.start();
                } catch (IOException e) {
                    e.printStackTrace();
                }
            }
        }).start();
    }
}

Above, we have completed the code of a Socket server implemented using the Thread-Per-Message design pattern. But there is a problem here, did you notice it?

When using this design pattern, if there is a large amount of high concurrency, there will be serious performance problems. If we create a thread to handle each I/O request, when a large number of requests come in simultaneously, a large number of threads will be created, and the JVM may have a memory overflow problem because it cannot handle so many threads.

Even in scenarios where there are not a large number of threads, creating and destroying threads for each request is a significant performance overhead for the system.

In the face of this situation, we can use a thread pool instead of creating and destroying threads to avoid the performance problems caused by creating a large number of threads. It is a good optimization method.

Worker-Thread Design Pattern #

In the Worker-Thread design pattern, workers (threads) take turns processing incoming tasks. When there are no tasks, the workers will wait until new tasks arrive. Along with the worker roles, the Worker-Thread design pattern also includes a pipeline and products.

Compared to the Thread-Per-Message design pattern, the Worker-Thread design pattern reduces the performance overhead of frequent thread creation and destruction, as well as the risk of memory overflow caused by creating unlimited threads.

Let’s imagine a scenario to understand how this pattern is implemented. We will use the Worker-Thread design pattern to complete a logistics sorting task.

Suppose there are 8 robots on a logistics sorting assembly line. They continuously retrieve packages from the assembly line, package them, and send them onto the truck. When the products in the warehouse are packed, they are put onto the logistics sorting assembly line instead of being handed directly to the robots. The robots then randomly sort the packages from the assembly line. The code is as follows:

// Package class
public class Package {
    private String name;
    private String address;

    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public String getAddress() {
        return address;
    }

    public void setAddress(String address) {
        this.address = address;
    }

    public void execute() {
        System.out.println(Thread.currentThread().getName() + " executed " + this);
    }
}

// Pipeline class
public class PackageChannel {
    private final static int MAX_PACKAGE_NUM = 100;

    private final Package[] packageQueue;
    private final Worker[] workerPool;
    private int head;
    private int tail;
    private int count;

    public PackageChannel(int workers) {
        this.packageQueue = new Package[MAX_PACKAGE_NUM];
        this.head = 0;
        this.tail = 0;
        this.count = 0;
        this.workerPool = new Worker[workers];
        this.init();
    }

    private void init() {
        for (int i = 0; i < workerPool.length; i++) {
            workerPool[i] = new Worker("Worker-" + i, this);
        }
    }

    /**
     * push switch to start all workers
     */
    public void startWorker() {
        Arrays.asList(workerPool).forEach(Worker::start);
    }

    public synchronized void put(Package packageReq) {
        while (count >= packageQueue.length) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        this.packageQueue[tail] = packageReq;
        this.tail = (tail + 1) % packageQueue.length;
        this.count++;
        this.notifyAll();
    }

    public synchronized Package take() {
        while (count <= 0) {
            try {
                this.wait();
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
        Package request = this.packageQueue[head];
        this.head = (this.head + 1) % this.packageQueue.length;
        this.count--;
        this.notifyAll();
        return request;
    }
}

// Worker class
public class Worker extends Thread {
    private static final Random random = new Random(System.currentTimeMillis());
    private final PackageChannel channel;

    public Worker(String name, PackageChannel channel) {
        super(name);
        this.channel = channel;
    }

    @Override
    public void run() {
        while (true) {
            channel.take().execute();

            try {
                Thread.sleep(random.nextInt(1000));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }
}

// Test class
public class Test {
    public static void main(String[] args) {
        // Create 8 workers
        final PackageChannel channel = new PackageChannel(8);
        // Start working
        channel.startWorker();
        // Add packages to the pipeline
        for (int i = 0; i < 100; i++) {
            Package packageReq = new Package();
            packageReq.setAddress("test");
            packageReq.setName("test");
            channel.put(packageReq);
        }
    }
}

In this code, we have 8 workers continuously sorting the packaged products in the warehouse.

Summary #

In general, when we need to pass or isolate some thread variables, we can consider using the Context design pattern. In business scenarios involving database read-write separation, ThreadLocal is often used to dynamically switch data source operations. However, when using ThreadLocal, we need to be careful about memory leaks. We have already discussed this issue in the [25th lecture].

When the main thread takes a long time to process each request, blocking issues may arise. In this case, we can consider delegating the main thread’s business to new worker threads to improve the system’s parallel processing capability. The Thread-Per-Message design pattern and the Worker-Thread design pattern are both design patterns that enhance system parallel processing capability through multi-threading.

Thought Question #

In addition to the above-mentioned multithreading design patterns, have you ever used other design patterns to optimize multithreaded business processes in your normal work?