12 Scheduling How to Coordinate the Execution of Multiple Tasks

12 Scheduling How to coordinate the execution of multiple tasks #

Hello, this is Jingyuan.

The topic I want to share with you today is related to “orchestration”, and the key purpose of introducing this technology is to handle more complex scenarios.

In the previous lessons, we talked about synchronization and asynchronous strategy mechanisms between functions, as well as stateful business processing. Although these scenarios are already more complex than individual functions, the strategies of the functions themselves can still solve these problems.

But imagine if you encounter a situation where each business requires the coordination of dozens of functions. Can the function calls be easily handled?

  • Order creation and management in e-commerce;
  • Consolidating files and videos after fragmentation to generate new reports or video media;
  • Auditing advertising materials (including political issues, pornography, redirecting, cheating, etc.);

Isn’t it complex just to think about it? I believe that it is relatively difficult to govern in a microservices architecture, let alone at a more granular function level. We know that as services are broken down into smaller units, although the modularity will improve, the difficulty of service governance will increase.

So, is there an ability in the Serverless domain that can coordinate the execution of various services and functions in a certain way, so that we can enjoy high elasticity and low costs while reducing the complexity of business processing? This ability does indeed exist, and it is commonly known as “workflow (Serverless Workflow)”. It can be applied to the scenarios mentioned above.

What is a Workflow? #

What is a workflow? CNCF provides some explanation in its protocol specification. In general, it can be summarized as follows: It defines a set of structures and models through DSL, which exists in the form of JSON or YAML. The service engine will schedule and execute tasks based on the defined blueprint.

The workflow implemented by cloud providers allows you to coordinate one or more distributed tasks through sequential, branching, and parallel methods. These tasks can include functions, as well as services and applications. With the help of the platform’s state tracking, logging, and exception retry logic, you can free yourself from tedious work and enjoy the capabilities of a fully managed service.

Now that we have this abstract understanding, what does it look like specifically, how is it composed, and what is its operating mechanism? Let’s unveil it layer by layer.

Basic Composition #

First, let’s take a look at the form of the workflow through a simple diagram.

Image

As you can see, the state transition of the workflow is similar to the transitions of the state machine we encounter. In addition to state transitions, it also has the following characteristics.

  • Each state is a working node

It defines a special logic that determines the control logic that the current workflow process should execute. Usually, it includes: operations, communications, selections, parallelism, waiting, looping, and so on. Through different combinations of these states, the construction of the business model becomes as simple as normal programming, greatly enriching the practical application scenarios of the workflow.

  • Working nodes can be associated with services

Working nodes can invoke other services through APIs. The most common scenario is to combine serverless functions, where each atomic service is implemented using a cloud function, and the association between services is implemented through the workflow. In order to enrich the application scenarios of serverless workflows, some vendors usually associate with other cloud services within the same cloud ecosystem. For example, Alibaba Cloud’s Serverless Workflow and Tencent Cloud’s ASW support the integration of cloud service APIs.

  • Each node has clear inputs and outputs

Each node can set inputs and outputs as a means of data transmission, but in a workflow, there are two special rules:

First, when the current node is the initial node, its input is the initial input of the workflow;

Second, when the current node is not the initial node, its input is the output of the previous parent node.

Image

Therefore, you can regard the workflow as a collection of state nodes and their transitions, and each state node can be associated with events or functionalities, and has clear inputs and outputs. From the perspective of the completeness of the solution, we also need to add logging and auditing to monitor the execution of the workflow. We also need to have secure verification mechanisms and error handling capabilities.

What is DSL? #

After understanding the basic composition of workflows, let’s not rush into detailed architectural design. Do you remember when I mentioned “What is a workflow?” before, and mentioned “defining a set of structures and models through DSL”? What is this DSL, and what role does it play in the design of workflow architecture?

DSL (Domain Specific Language) is used to describe and define business logic. When a workflow is executed, it will follow the logic you defined. In the diagram of the basic composition, you can see that each execution consists of multiple states. These states can be simple one-time operations, which are represented in the DSL as the following state types:

  • Event: After receiving the specified trigger type of event (such as the S3 object storage event mentioned in the previous course), it will perform an action (referred to as an action in the DSL), such as calling a certain cloud function. It is important to note that event triggering needs to strictly adhere to the Cloud Events format protocol.

  • Operation: It also performs an action, but unlike the Event state, the Operation state does not require event triggering. It can actively execute actions.

  • Sleep: It can suspend the current execution and set a wait time, similar to the Sleep operation in threads. After the wait time is over, the execution will resume.

  • Inject: It can inject some static data into the input data of the workflow, and then pass it to the next state.

These states can also be used to control the execution logic of complex processes, represented as the following state types in the DSL:

  • Switch: Similar to the switch keyword in programming languages, it defines multiple option branches based on event conditions and determines the next state based on some filtering conditions. When none of the conditions are met, the default condition can be used as the default next state.

  • Parallel: It contains a series of branches, defined by the branch keyword. However, unlike the Switch state, the sub-states of these branches are executed in parallel.

  • ForEach: Similar to an iterator in programming languages, this state’s input is generally in the form of an array or collection, and it iterates through the logic.

Here, we need to note that although different vendors generally follow the standard protocols set by CNCF in their implementations, there are still slight differences in language definitions and constraints. Different vendors also use different names, such as Alibaba Cloud FDL, Tencent Cloud TCSL, Amazon ASL, etc. You can refer to the comparison table below:

Image

You will find that some vendors have added some states based on their own requirements, such as Success and Fail. Other states, besides the differences in names, have basically the same functionality. Let’s take Alibaba Cloud Serverless Workflow as an example to further deepen our understanding:

version: v1
type: flow
steps:
  - type: parallel
    name: parallelDemo
    branches:
      - steps:
        - type: task
          name: function1
          resourceArn: acs:fc:{region}:{accountID}:services/{serviceName}/functions/{functionName}
      - steps:
        - type: task
          name: function2
          resourceArn: acs:fc:{region}:{accountID}:services/{serviceName}/functions/{functionName}

You can also use a combination of resourceArn and pattern to integrate the capabilities of multiple cloud services. Here, resourceArn is used to define the target service to be integrated, and pattern is used to define the integration pattern. I won’t go into further detail about the specific fields of each state. If you need to develop and deploy, you can read through the documentation of the specific cloud vendor you choose.

Architecture Design #

After understanding the basic components of the workflow and the definition language of the model, if we only use it, then it’s almost done. We just need to be proficient in using the language to define the corresponding JSON or YAML files.

But if we want to develop such a feature ourselves, how can we implement it? Let’s take a look at the overall architecture diagram:

Image

As shown in the diagram, a serverless workflow typically requires three core service modules: metadata management, scheduling, and execution.

  • APIServer: Responsible for the lifecycle management of metadata, including state management, templates, execution records, etc.
  • Scheduling service: Routes the scheduling to the corresponding execution engine based on the data flow request. Generally, it also needs capabilities such as load balancing, rate limiting, and fault migration.
  • Execution engine: Responsible for parsing the workflow language, executing the workflow, reporting execution history, and calling other cloud services.

In terms of usage, users control the basic information of the workflow through APIServer, and then execute the workflow by requesting the scheduling module, following the same steps as shown in the diagram. Here, let’s focus on the request flow of the data interface:

  1. After the request reaches the scheduling module, the scheduling module first obtains the current workflow’s definition content and execution task’s metadata from APIServer.
  2. The scheduling module distributes the metadata and request content to the designated execution engine based on the request.
  3. The execution engine generates some metadata for this execution and records it in the database. Apart from the definition of the workflow itself, each execution is stateless, so a separate task ID needs to be set to facilitate subsequent request re-entry operations.
  4. The execution engine parses the incoming workflow definition based on the syntax of the workflow language.
  5. The execution engine executes in an ordered manner based on the parsed state and calls other services for processing based on the state semantics.

Finally, when all states have finished execution, the workflow execution engine will return the result based on the final output definition.

Of course, you can also break it down further. For example, you can separate the part that interacts with the front-end into a Console service, and split the model configuration and template examples into a separate template service. The granularity of the services does not have a hard rule.

Considerations for Production Grade #

With the above architecture, you should be able to implement a demo-level workflow. However, to complete a production-grade workflow, you also need to consider factors such as high availability, high performance, and security.

Image

First, the three major services should not be presented in a single point manner, they need to be deployed in a distributed manner. Each service is forwarded via a load balancing layer to provide an externally exposed HaVIP or domain name.

Second, for the execution engine, because the core content of the workflow is the semantic parsing and execution of the workflow process, and each workflow task will start a separate coroutine or thread to work, the load on this engine is high, so it needs to have the ability to horizontally scale in and out. The usual approach is to deploy a registry center, and then register the execution engines as services. The scheduling service will discover the current instances of the execution engine through service discovery.

Third, as the application and the number of calls increase, you also need to pay special attention to the selection of message middleware, database middleware, and storage middleware in the production environment. For example, for the data of the execution records, you can’t just use MySQL as during the demo period. You need to choose storage systems such as Elasticsearch, Apache Doris, and others.

Fourth, considering the system’s performance and the situation of facing traffic peaks, message middleware and caching middleware can be used between the scheduling service and execution engine. For example, in the case of frequent data reading, you can preload metadata and template information, etc., into a cache system like Redis.

In addition, two considerations have not been reflected in our diagram:

  1. Security: You need to integrate business services into a user authentication and authorization system to ensure the security of the services.
  2. Observability: Workflow targets complex business orchestration scenarios, so troubleshooting becomes more difficult when problems occur. Therefore, full-link monitoring of the system becomes even more important. You need to provide metrics such as link state transition records, execution results, and execution time, and set up corresponding alert mechanisms based on these metrics.

Request Re-entry #

Let’s take a look at the re-entry of requests. As mentioned earlier, the scheduling service needs to have capabilities such as request routing, service discovery, fault migration, rate limiting, and load balancing. These are governance measures to be considered when implementing a microservice feature.

In addition to these, you also need to pay attention to the ability of request re-entry, ensuring that the execution of the same request can be performed on the same instance of the execution engine. This avoids the problem of duplicate execution of the same request.

To ensure request re-entry, first, we should try to let tasks of the same workflow fall on the same execution engine. Here, you can associate each user ID with an instance of the execution engine through hash mapping and write it to Redis.

Then, to distinguish duplicate execution tasks, each execution task needs a unique identifier. Since different workflow execution tasks may have the same name, to ensure uniqueness and readability, I will provide you with a way to generate a task ID:

TaskID = UserName|workFlowName|TaskName

Record the association between TaskID and the instance of the execution engine in Redis. Before each execution, you can check whether the current execution request has already been executed, thus avoiding duplicate execution tasks.

Image It should be noted that when a certain execution engine fails, the scheduling module will schedule all the execution tasks of the failed execution engine in Redis to other instances, thus achieving task migration and ensuring the stability of the entire system. Of course, you can also choose other key-value middleware to implement this idea.

Syntax Parsing Process #

After understanding the basic architecture, let’s take a look at how the syntax parsing of the execution engine is implemented.

As mentioned earlier, the workflow definition format exists in the form of JSON and YAML, and we need to first parse the information defined in the file, and then execute each defined state in order. Within the workflow, the relationship between states is a data structure with the characteristics of an “n-ary tree”, so we can use the depth-first traversal idea to execute each state in the workflow.

Here is a piece of code for the core processing process based on the state classification of the DSL. In order to facilitate your understanding, I have used the switch…case method, but you can choose a better design pattern to implement it:

func (flow *Flow) ProcessState(input *Input, flowDefinition *FlowDefinition) *Output {
    
    ...
    
    var output *Output
    state := flow.getStartState(flowDefinition)
    
    for {
        switch state.GetType() {
        case api.Event:
            output = flow.processEvent(state, input)
        case api.Operation:
            output = flow.processOperation(state, input)
        case api.Inject:
            output = flow.processSleep(state, input)
        case api.Switch:
            output = flow.processSwitch(state, input)
        case api.Parallel:
            output = flow.processParallel(state, input)
        case api.Foreach:
            output = flow.processForeach(state, input)
        default:
            flow.logger.Errorf("invaid state type: %v", state)
            ...
            break
        }

        if output.Error != nil {
            break
        }

        // Exit when reaching the last state
        state = state.NextState(flowDefinition)
        if state == nil {
            break
        }

        // The output of the current state is the input of the next state
        input.Input = output.Output
    }
   
    return output
}

Next, let’s understand the syntax parsing process based on the above code.

According to the depth-first traversal principle, we first need to get the start state based on the defined content, and treat the start state as the current state, while using the input data from the call request as the input of the start state.

Then, based on the state type of the current state, select the corresponding processing flow. For example, for the Event type, the implementation of your processEvent method can call the API interface of the relevant service based on the Action in the State to execute the task. If it is a Parallel state, you can start a coroutine or thread for each subtask of the state, and then wait for all the substates to finish before returning the results.

After the current state is processed, get the next state as the current state, and set the output of the current state as the input for the next state, as shown in the state.NextState(flowDefinition) in the above code.

Finally, until there is no next state for the current state, the execution of the entire workflow is completed, and the output of the last state is returned as the output of the entire workflow.

So far, we have covered most of the key points about workflows. In fact, in addition to the above mentioned points, to build a more comprehensive workflow solution, issues such as measurement and billing, support for callbacks, and how to set a common invocation protocol should also be taken into account.

Are you still not satisfied? Here’s a sneak peek: in Lesson 16, we will get hands-on experience together and explore the core concepts of template-based architecture and the methods for quickly deploying services. It includes specific operations such as single function templates, multi-function orchestration, and template construction, allowing you to not only understand the principles, but also get practical experience.

Summary #

In this lesson, I introduced you to the workflow, a coordination technique for executing multiple tasks.

First of all, we should know that workflows can be used for coordinating and orchestrating the execution of multiple functions and sub-processes. Workflows are used in scenarios such as transactional businesses like orders, processing large multimedia files, and automating data pipeline operations.

Furthermore, although different vendors have different descriptions and definitions of implementing business logic, they generally follow CNCF specifications. You can use sequencing, branching, parallelism, and other methods to coordinate one or more distributed tasks, which can include not only functions but also services and applications.

The core components of a workflow include the definition of the process, the parsing of the process, and the input and output of each node. We can use DSL language to orchestrate the states of various nodes, including eight commonly used states such as operation, parallelism, selection, and delay.

In terms of architectural design, we need to focus on the three core services: APIServer, scheduling service, and execution engine. However, if you are a platform designer/designer, you also need to consider multiple auxiliary functional items such as security, observability, metering and billing, and log storage in order to design a complete solution.

I hope that through this lesson, you can discover another powerful feature of Serverless. In addition to the form of function computing, workflows play a huge role in parallel invocation, batch processing, and transactional scenarios. Server Less is More is not just an empty slogan.

Thought Question #

Okay, this lesson is coming to an end, and I have one thought question for you.

Do we need to always save the execution records of a workflow execution engine? If not, how would you design a cleanup program? What key points should be considered?

Please feel free to write down your thoughts and answers in the comments section. Let’s exchange and discuss together. Thank you for reading, and please feel free to share this lesson with more friends for further discussion.

Further Reading #

The CNCF Serverless WG workflow team has developed a comprehensive protocol called Serverless Workflow Specification. This document not only provides the specifications but also includes relevant use cases.