11 Concurrency Patterns Effective Concurrency Patterns to Use Right Away in Go

11 Concurrency Patterns - Effective Concurrency Patterns to Use Right Away in Go #

In the previous lesson, I explained to you how to better control multiple goroutines through Context. The final question of the lesson was: How can we implement log tracing with Context?

To trace a user’s request, there must be a unique ID to identify which functions were called and which code was executed during the request. Then, this unique ID can be used to link the log information together, forming a log trace and achieving user tracking. So, we have a plan.

  1. Generate a TraceID at the entry point of the user’s request.
  2. Save the TraceID using context.WithValue.
  3. Then, this Context containing the TraceID can be passed as a parameter between goroutines or functions.
  4. In places where logging is needed, retrieve the saved TraceID using the Context’s Value method, and log it along with other log information.
  5. This way, logs with the same TraceID can be linked together, achieving the goal of log tracing.

The core implementation of the above plan is the value passing feature of Context.

We have already become proficient in goroutines, channels, and synchronization primitives in the sync package. These are all fundamental elements of concurrent programming. In this lesson, we will introduce how to use these basic elements to form concurrency patterns that help us write concurrent programs better.

for-select Loop Pattern #

The for-select loop pattern is very common and has been used in previous lessons. It is usually used in combination with channels to complete tasks. The code format is as follows:

for { //infinite loop with 'for' or 'for range' loop

  select {

    //controlled by a channel

  }

}

This is a concurrency pattern combining a for loop with multiple select statements. The case that satisfies the condition will be executed, until a certain condition is met to exit the for loop (e.g., sending exit signals).

From a concrete implementation perspective, the for-select loop has two patterns. One is the infinite loop pattern used in the watchdog example from the previous lesson, which only exits when it receives a termination command, as shown below:

for  {

   select {

   case <-done:

      return

   default:

      //execute specific tasks

   }
 }

In this pattern, the default statement will continue to execute the tasks until the done channel is closed.

The second pattern is the finite loop pattern using for range select, which is generally used to send iterable content to a channel, as shown below:

for _,s:=range []int{}{

   select {

   case <-done:

      return

   case resultCh <- s:

   }
}

In this pattern, there is also a done channel, used to exit the current for loop, and another resultCh channel is used to receive the values of the for range loop. These values can be transmitted to other callers through the resultCh channel.

select timeout Pattern #

If we need to access a server to retrieve data, since different networks have different response times, it is not feasible to wait indefinitely for the network to return. Therefore, we need to set a timeout period, which can be achieved using the select timeout pattern, as shown below:

ch11/main.go

func main() {

   result := make(chan string)

   go func() {

      //simulating network access

      time.Sleep(8 * time.Second)

      result <- "Server Result"

   }()

   select {

   case v := <-result:

      fmt.Println(v)

   case <-time.After(5 * time.Second):

      fmt.Println("Network access timed out")

   }
}

The core of the select timeout pattern is to set a timeout period using the time.After function, to prevent the select statement from waiting indefinitely due to exceptions.

Pro Tip: If we can use the WithTimeout function of Context to cancel the timeout, we should prioritize using that.

Pipeline Pattern #

The Pipeline pattern, also known as the assembly line pattern, simulates the production process of an assembly line in the real world. Taking the assembly of mobile phones as an example, the entire production line may have hundreds or thousands of steps, with each step being responsible for its own task. Finally, through one step at a time, the final assembly of a mobile phone is completed. From a technical standpoint, the output of each process serves as the input for the next process, and the thing that is passed between processes is data. This pattern is called the pipeline pattern, and the data being passed is called a data stream.

Image1.png

(Pipeline pattern)

From the above illustration of the pipeline pattern, we can see that from the beginning of production, through process 1, 2, 3, 4, to the final product, it forms a visual pipeline, or a Pipeline.

Now, let me explain the usage of the pipeline pattern using the example of assembling a mobile phone. Let’s assume there are 3 processes in the assembly line: component procurement, component assembly, and packaging of the finished product, as shown in the diagram:

Image2.png

(Mobile phone assembly line)

From the above diagram, it can be seen that the purchased components are passed to process 2 for assembly through a channel, and then passed to process 3 for packaging via another channel. In relation to process 2, process 1 is the producer, and process 3 is the consumer. In relation to process 1, process 2 is the consumer. In relation to process 3, process 2 is the producer.

I will demonstrate using the following code snippets:

ch11/main.go

// Process 1: Component procurement

func buy(n int) <-chan string {

    out := make(chan string)

    go func() {

        defer close(out)

        for i := 1; i <= n; i++ {

            out <- fmt.Sprint("Component", i)
        }

    }()

    return out
}

First, we define a procurement function called buy, which takes an argument n to specify the number of sets of components to purchase. The implementation logic of the procurement code is to generate components through a for loop, then place them into the channel variable out, and finally return out so that the caller can get the components from it.

Once the components are purchased, assembly can begin as shown in the following code:

ch11/main.go

// Process 2: Component assembly

func build(in <-chan string) <-chan string {

    out := make(chan string)

    go func() {

        defer close(out)

        for c := range in {

            out <- "Assembly(" + c + ")"
        }

    }()

    return out
}

The assembly function build takes a channel parameter in, which is used to receive components for assembly. The assembled phones are then returned through another channel variable out.

Once the phones are assembled, they can be packaged in attractive boxes for sale. Packaging is done by process 3, and the corresponding function is pack, as shown below:

ch11/main.go

// Process 3: Packaging

func pack(in <-chan string) <-chan string {

    out := make(chan string)

    go func() {

        defer close(out)

        for c := range in {

            out <- "Packaging(" + c + ")"
        }

    }()

    return out
}

The implementation of the pack function is similar to the build function, so it is not repeated here.

Once all three processes on the pipeline are completed, they can be organized together by a coordinator to form a complete mobile phone assembly line. This coordinator can be the main function commonly used in our code, as shown in the following code:

ch11/main.go

func main() {

    coms := buy(10)         // Purchase 10 sets of components

    phones := build(coms)   // Assemble 10 phones

    packs := pack(phones)   // Package them for sale

    // Output test to see the result
func main() {
   // 定义接收组装好的手机的 channel
   phones := make(chan string)

   // 启动工序1采购配件
   parts1 := buy("配件1", "配件2", "配件3")
   // 启动工序2-1组装手机
   phones1 := build(parts1)
   // 启动工序2-2组装手机
   phones2 := build(parts1)
   // 启动工序2-3组装手机
   phones3 := build(parts1)
   // 合并三个工序2的输出
   mergedPhones := merge(phones1, phones2, phones3)
   // 启动工序3打包手机
   pack(mergedPhones, phones)

   // 打印最终组装完成的手机
   for p := range phones {
      fmt.Println(p)
   }
}

在主函数中,按照示意图的流程顺序,启动了多个协程来执行每个工序。工序 1 采购配件,得到的数据通过 channel 传递给工序 2-1、2-2、2-3 来组装手机。工序 2-1、2-2、2-3 同时工作,每个工序收到数据后,通过 channel 发送给 merge 组件,最终 merge 组件把三个工序 2 的输出合并成一个 channel。最后,工序 3 打包手机,把最终组装好的手机送到 phones channel 中。最后使用一个 for 循环打印出 phones channel 中的手机,表示流水线全部完成。

通过扇出和扇入模式,并发地执行各个工序,可以大幅提高手机组装流水线的产能。例如,在工序 1 采购配件和工序 2-1、2-2、2-3 同时执行的情况下,可以同时多个工序之间的执行,整体效率提高。

这个例子展示了如何使用扇出和扇入模式来优化流水线的设计,扩展流水线的产能。 ch11/main.go

func main() {

   coms := buy(100)    // Purchase 100 sets of accessories

   // Three shifts assemble 100 mobile phones simultaneously
   phones1 := build(coms)
   phones2 := build(coms)
   phones3 := build(coms)

   // Merge three channels into one
   phones := merge(phones1,phones2,phones3)

   packs := pack(phones) // Pack them for sale

   // Output test, see the effect
   for p := range packs {
      fmt.Println(p)
   }
}

In this example, 100 sets of accessories are purchased, which means the capacity is increased. Therefore, the build function is called three times simultaneously to increase manpower for process 2. Here, three shifts assemble accessories simultaneously. Then, merge these three channels into one using the reusable merge component, and pass the result to the pack function for packaging.

By using the fan-out and fan-in pattern, the entire pipeline is expanded, greatly improving production efficiency. Because there is a reusable merge component that can be used for any process that requires fan-out and fan-in to improve performance in the pipeline, no modifications are needed.

Futures Pattern #

The steps in the pipeline pattern are interdependent, meaning that the next step can only start after the previous step is completed. However, in our actual requirements, there are also many tasks that are independent and have no dependencies on each other. These independent tasks can be executed concurrently in order to improve performance.

For example, if I plan to cook a hotpot by myself, I need to wash the vegetables and boil water. Washing vegetables and boiling water are two steps that have no dependency on each other and are independent. Therefore, they can be done simultaneously. However, the final step of cooking the hotpot can only be done after the vegetables are washed and the water is boiled. This scenario is suitable for the Futures pattern.

The Futures pattern can be understood as a pattern for the future, where the main goroutine does not need to wait for the results from the child goroutines. It can go on to do other things and wait for the results from the child goroutines in the future. If the results from the child goroutines are not returned, it will keep waiting. I demonstrate this using the following code:

ch11/main.go

// Wash vegetables
func washVegetables() <-chan string {
   vegetables := make(chan string)
   go func() {
      time.Sleep(5 * time.Second)
      vegetables <- "Washed vegetables"
   }()
   return vegetables
}

// Boil water
func boilWater() <-chan string {
   water := make(chan string)
   go func() {
      time.Sleep(5 * time.Second)
      water <- "Boiled water"
   }()
   return water
}

The two independent tasks of washing vegetables and boiling water can be done together. Therefore, in the example, they are executed simultaneously by starting goroutines. When the tasks are completed, the results are returned through a channel.

Tip: The 5-second wait in the example is used to describe the time taken for washing vegetables and boiling water.

When starting two child goroutines to wash vegetables and boil water at the same time, the main goroutine can do something else (in the example, take a nap) while waiting for the results. When it wakes up and is ready to cook the hotpot, it needs the results of washed vegetables and boiled water. I demonstrate this using the following code:

ch11/main.go

func main() {
   vegetablesCh := washVegetables() // Wash vegetables
   waterCh := boilWater()           // Boil water
   fmt.Println("Washing vegetables and boiling water have been scheduled, I'll take a nap first")
   time.Sleep(2 * time.Second)
   fmt.Println("I'm ready to cook the hotpot, let's see if the vegetables and water are ready")
   vegetables := <-vegetablesCh
   water := <-waterCh
   fmt.Println("Ready to cook the hotpot:",vegetables,water)
}

In the Futures pattern, goroutines differ from regular goroutines in that they can return results, and these results will be used at some point in the future. Therefore, the operation to retrieve these results in the future must be a blocking operation that waits until the result is obtained.

If your large task can be broken down into smaller tasks that can be executed concurrently and the final result of the large task can be obtained from the results of these smaller tasks, you can use the Futures pattern.

Summary #

Concurrency patterns and design patterns are similar in that they both provide a unified solution by abstracting and encapsulating real-world scenarios. However, unlike design patterns, concurrency patterns focus more on asynchronous and concurrent execution.