Special Delivery Design and Implementation of Distributed Task Systems

Special Delivery Design and Implementation of Distributed Task Systems #

Hello, I’m Kong Lingfei, and here we meet again. The end of the course doesn’t mean the end of our journey. I’m very pleased to continue sharing good content with you, and I hope you can keep in touch with me in the comments section, sharing your learning experiences and practical insights.

In today’s lecture, let’s talk about how to design a distributed job system. In actual Go project development, we often encounter the following two functional requirements:

  • Need to execute a certain task at a scheduled time, such as cleaning up useless data in the database every day at 10:00 am.
  • Poll a field in a database table and perform some asynchronous business logic processing based on the status of the field. For example, when table_xxx.status = 'pending' is detected, perform asynchronous initialization process, and after completion, set table_xxx.status='normal'.

These two common and fundamental functional requirements in Go project development can usually be implemented through a job system. In order to solve these common functional requirements, IAM has also developed its own job system. In today’s lecture, let’s take a look at how IAM implements the job system.

Task Classification #

Before introducing the job system, let’s take a look at the classification of tasks. Understanding the classification of tasks helps us understand the types of tasks performed by the job system, which in turn helps us design the job system.

In my opinion, tasks can be divided into the following three categories.

  • Scheduled tasks: Scheduled tasks are executed at specified time points. As long as the execution time of the task is reached, the task will be executed, regardless of whether the previous task has been completed.
  • Interval tasks: After the previous task is completed, a certain amount of time (e.g. 5 seconds, 5 minutes) is waited before the next task is executed.
  • Interval-scheduled tasks: A variation of interval tasks, where the time is counted from when the previous task starts executing. As soon as the interval time is reached, the next task is executed, regardless of whether the previous task has been completed.

Scheduled tasks are easy to understand, but it is not easy to distinguish between interval tasks and interval-scheduled tasks. The difference between them is that interval tasks wait for the previous task to be completed before waiting for a certain amount of time before executing the next task. On the other hand, interval-scheduled tasks do not wait for the previous task to be completed and execute the next task as soon as the interval time is reached.

The differences between the three are shown in the following figure:

Image

In actual project development, we often encounter requirements for these three types of tasks.

Common Implementations of Job Systems #

Before introducing the implementation of the IAM job system, it is necessary to first discuss how to execute interval/timed tasks. Only by understanding these can we better design the IAM job system. Generally speaking, there are four ways to execute an interval/timed task:

  1. Develop a service to execute interval/timed tasks by using methods provided by the time package (such as time.Sleep, time.Ticker, etc.).
  2. Some Go packages support executing interval/timed tasks, and these packages can be directly used to execute interval/timed tasks, eliminating the need to develop the job scheduling part of the code ourselves. For example, github.com/robfig/cron.
  3. Use Linux’s crontab to execute timed tasks.
  4. Use an open-source job system to execute interval/timed tasks, such as distribworks/dkron.

Each of the above four methods has its own advantages and disadvantages. If we choose the first method, because everything needs to be implemented from scratch, the development workload is large and the development efficiency is low. I believe that since there are already many excellent cron packages available, there is no need to develop from scratch. We can directly use these cron packages to execute periodic/timed tasks. The IAM project adopts this method.

Next, let’s first introduce the third and fourth methods: using the Linux crontab and using an open-source Go job system. Then, we will focus on the second method adopted by the IAM project.

Linux crontab #

Crontab is a built-in timed execution tool in the Linux system, which can run jobs without manual intervention. Crontab provides services through the crond process. The crond process regularly checks whether there are tasks to be executed every minute. If there are, it will automatically execute these tasks. The crond process reads the crontab configuration to determine whether there are tasks to be executed and when to execute them.

The crond process will look for the crontab configuration files in the following three locations:

  • /var/spool/cron/: This directory stores crontab tasks for users (including root). Each task is named after the login name. For example, the crontab task created by the user “colin” corresponds to the file /var/spool/cron/colin.
  • /etc/crontab: This directory stores crontab tasks created and maintained by the system administrator.
  • /etc/cron.d/: This directory stores any crontab tasks to be executed. When the cron process is running, it will automatically scan all files in this directory and execute the commands specified in those files based on the time settings in the files.

As can be seen, to execute a crontab task, it is necessary to ensure that the crond process is running and configure the crontab task. This can be done in the following two steps:

Step 1: Ensure that the crond process is running.

Execute the following command to check the running status of the crond process:

$ systemctl status crond
● crond.service - Command Scheduler
   Loaded: loaded (/usr/lib/systemd/system/crond.service; enabled; vendor preset: enabled)
   Active: active (running) since Wed 2021-11-17 07:11:27 CST; 2 days ago
 Main PID: 9182 (crond)
    Tasks: 1
   Memory: 728.0K
   CGroup: /system.slice/crond.service
           └─9182 /usr/sbin/crond -n

If Active: active (running) is displayed, it means that the crond process is running. Otherwise, you can execute systemctl start crond to start the crond process.

Step 2: Configure the crontab task.

You can use the crontab -e command to edit the configuration file. For example, executing crontab -e will enter the vi interactive interface, and you can configure the following crontab tasks:

# Output the time to the file /tmp/test.txt every minute
*  *  *  *  * echo `date` >> /tmp/test.txt

# Synchronize the internet time every 2 minutes
*/2 * * * * /usr/bin/ntpstat time.windows.com > /dev/null 2>&1

The edited configuration file is saved in the /var/spool/cron/$USER file. You can use crontab -l or sudo cat /var/spool/cron/$USER to view it. For example:

$ crontab -l
# Output the time to the file /tmp/test.txt every minute
*  *  *  *  * echo `date` >> /tmp/test.txt

# Synchronize the internet time every 2 minutes
*/2 * * * * /usr/bin/ntpstat time.windows.com > /dev/null 2>&1

If you want to delete all crontab tasks, you can execute the crontab -r command.

The configured crontab tasks need to follow the crontab time format, which is as follows:

.---------------- minute (0 - 59)     
|  .-------------- hour (0 - 23)    
|  |  .----------- day of month (1 - 31)    
|  |  |  .-------- month (1 - 12) OR jan,feb,mar,apr ...    
|  |  |  |  .----- day of week (0 - 6) (Sunday=0 or 7) OR sun,mon,tue,wed,thu,fri,sat    
|  |  |  |  |    
*  *  *  *  * <command to be executed>

As you can see, crontab can only be accurate to the minute and cannot be accurate to the second.

Below are some commonly used crontab time formats for your reference to deepen your understanding:

# Execute <command> every minute            
* * * * * <command> # * stands for all possible values

# Execute <command> once every hour
* */1 * * * <command> # / indicates frequency

# Execute <command> at 15 and 30 minutes past each hour
15,45 * * * * <command> # , indicates parallel

# Execute <command> at 15 and 45 minutes past each hour from 8-11 in the morning
15,45 8-11 * * * <command> # - indicates range

# Execute <command> at 3 and 15 minutes past 8-11 in the morning every Monday
3,15 8-11 * * 1 <command>

# Execute <command> at 3 and 15 minutes past 8-11 in the morning every other day
3,15 8-11 */2 * * <command>

The advantages of using crontab for periodic/scheduled tasks are that there is no need for development, only configuration of the crontab task is required. The disadvantages are also obvious and mainly include the following:

  • It cannot be accurate to the second.
  • Manual coding of executable commands. These executable commands are separate from the project and cannot reuse the capabilities provided by the project’s packages and functions. It is inconvenient to develop jobs closely related to the project.
  • Single point of failure. If the crond process is abnormal, periodic/scheduled tasks cannot continue to execute. You might say: You can configure and execute the same periodic/scheduled tasks on two machines. However, doing so can cause conflicts or inconsistent states between the two machines when both machines execute the same task simultaneously.
  • It cannot achieve interval tasks and periodic tasks.

Use open-source job systems #

In addition to using the crontab provided by the Linux system, we can also use some excellent open-source job systems in the industry. Here, I list some popular job systems developed in the Go language. The reason for choosing projects developed in the Go language is to enrich your Go language ecosystem, and using the same language also helps you learn and modify these projects.

  • distribworks/dkron. dkron is a distributed, fast-starting, fault-tolerant cron-like job system that supports crontab expressions. It has the following core features.
    • Easy to use: Jobs can be managed through an easy-to-use, beautiful web interface.
    • Reliable: It has fault tolerance mechanisms, so if one node is unavailable, other nodes can continue to execute jobs.
    • High Scalability: It can handle a large number of scheduled jobs and thousands of nodes.
  • ouqiang/gocron. gocron is a lightweight centralized scheduling and management system for cron jobs developed by a Chinese developer. It is used to replace Linux-crontab. It has the following core features.
    • It has a web interface to manage scheduled tasks.
    • It supports crontab time format and is accurate to the second.
    • It supports two types of tasks: shell commands and HTTP requests.
    • It has task timeout, task dependency, and task retry mechanisms.
    • It supports viewing task execution logs and notifying task execution results through email, Slack, Webhook, etc.
  • shunfei/cronsun. cronsun is a distributed job system, and a single node is similar to crontab. It has the following core features.
    • It has a web interface, making it easy to centrally manage scheduled tasks on multiple servers.
    • The task scheduling time granularity supports seconds.
    • If a task execution fails, it can be retried.
    • It ensures task reliability by selecting an available node from N nodes to execute the task.
    • It supports viewing task execution logs.
    • It sends email alerts for task failures (custom HTTP alert interfaces are also supported).

So, how do you choose from so many open-source projects? Here, I recommend choosing distribworks/dkron. The reason is that distribworks/dkron has a high number of stars, is full-featured, easy to use, and has rich documentation. Of course, in actual development, it is best to research other open-source projects as well, and choose the most suitable one based on your needs.

The advantages of using these job systems are that no development is required, they have more powerful features than crontab, and some are distributed job systems with fault tolerance capabilities. However, the disadvantages are also obvious:

  • These job systems have limited task types. For example, they usually support executing tasks through shell scripts or sending HTTP requests. Regardless of the method used, the implementation is separate from the project, making it not very simple or efficient when developing task plugins closely integrated with the project.
  • Usually, we only use a small part of the capabilities provided by these systems, or only one or two projects will use this type of system. However, we still need to deploy and maintain these job systems, which requires a lot of work with little payoff.
  • It cannot achieve interval tasks.

Using Linux’s crontab and open-source Go job systems, both methods have obvious disadvantages. Given these disadvantages, the IAM system chooses to use an existing cron library to encapsulate its own task framework and develop tasks based on this framework. The IAM project has chosen the robfig/cron library because it has the most stars and is feature-rich and easy to use. In addition, IAM also uses github.com/go-redsync/redsync to implement a Redis-based distributed mutex lock. Therefore, before introducing the IAM job system implementation, I will briefly explain how to use these two packages.

Introduction to github.com/robfig/cron #

github.com/robfig/cron is a cron package that allows you to schedule tasks similar to Linux crontab, with the added support for seconds.

Supported Time Formats #

The cron package supports two types of time formats: crontab format and fixed interval format. Let’s discuss each format in detail.

The crontab format follows the same matching symbols as a crontab. The time format is as follows:

     ┌─────────────second (0 - 60)
     │ ┌───────────── min (0 - 59)
     │ │ ┌────────────── hour (0 - 23)
     │ │ │ ┌─────────────── day of month (1 - 31)
     │ │ │ │ ┌──────────────── month (1 - 12)
     │ │ │ │ │ ┌───────────────── day of week (0 - 6) (0 to 6 are Sunday to
     │ │ │ │ │ │                  Saturday)
     │ │ │ │ │ │
     │ │ │ │ │ │
     * * * * * *   

The second type is the fixed interval format, specified as @every <duration>. The duration is a string that can be parsed by time.ParseDuration. For example, @every 1h30m10s means the task will be executed every 1 hour, 30 minutes, and 10 seconds. Note that the interval does not take into account the runtime of the task. For example, if the task takes 3 minutes to run and is scheduled to run every 5 minutes, there will only be 2 minutes of idle time between each run.

Example of Using the cron Package #

Using the cron package is simple. Here’s a basic example:

package main

import (
    "fmt"
    "github.com/robfig/cron/v3"
)

func helloCron() {
    fmt.Println("hello cron")
}

func main() {
    fmt.Println("starting go cron...")

    // Create a new cron instance
    cron := cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(nil), cron.Recover(nil)))

    // Add a scheduled task
    cron.AddFunc("*  *  *  *  *  *", helloCron)

    // Start the cron scheduler
    cron.Start()

    // Stop the scheduler, but tasks already running will not be stopped
    defer cron.Stop()

    select {} // Equivalent to for{} to keep the program running here
}

In the above code, a cron instance is created using the cron.New function. Then, a scheduled task is added to the cron instance using the AddFunc method, which executes the helloCron function every minute. Finally, the cron scheduler is started with cron.Start(). The cron.Stop() method is deferred to close the scheduler when the program exits.

Interceptors #

The cron package also supports installing interceptors that can perform the following functions:

  • Recover from panics in tasks (cron.Recover()).
  • Delay the execution of the next task if the previous task has not completed (cron.DelayIfStillRunning()).
  • Skip the execution of the next task if the previous task has not completed (cron.SkipIfStillRunning()).
  • Log each task invocation (cron.WithLogger()).
  • Notify when a task completes.

To use these interceptors, simply pass the corresponding Option when creating the cron instance, for example:

cron := cron.New(cron.WithSeconds(), cron.WithChain(cron.SkipIfStillRunning(nil), cron.Recover(nil)))

Introduction to github.com/go-redsync/redsync #

redsync is a library that allows you to implement distributed locks based on Redis. It is easy to use, and here is an example:

package main

import (
	goredislib "github.com/go-redis/redis/v8"
	"github.com/go-redsync/redsync/v4"
	"github.com/go-redsync/redsync/v4/redis/goredis/v8"
)

func main() {
	// Create a pool with go-redis (or redigo) which is the pool redisync will
	// use while communicating with Redis. This can also be any pool that
	// implements the `redis.Pool` interface.
	client := goredislib.NewClient(&goredislib.Options{
		Addr: "localhost:6379",
	})
	pool := goredis.NewPool(client) // or, pool := redigo.NewPool(...)

	// Create an instance of redsync to be used to obtain a mutual exclusion
	// lock.
	rs := redsync.New(pool)

	// Obtain a new mutex by using the same name for all instances wanting the
	// same lock.
	mutexname := "my-global-mutex"
	mutex := rs.NewMutex(mutexname)

	// Obtain a lock for our given mutex. After this is successful, no one else
	// can obtain the same lock (the same mutex name) until we unlock it.
	if err := mutex.Lock(); err != nil {
		panic(err)
	}

	// Do your work that requires the lock.

	// Release the lock so other processes or threads can obtain a lock.
	if ok, err := mutex.Unlock(); !ok || err != nil {
		panic("unlock failed")
	}
}

The above code creates an instance of redsync.Redsync and uses the NewMutex method provided by redsync.Redsync to create a distributed lock instance, mutex. The lock is acquired using mutex.Lock(), and it is released using mutex.Unlock().

Features of IAM Job System #

Before developing the job system for IAM, we need to map out the tasks that IAM needs to accomplish. IAM needs to perform the following periodic tasks:

  • Clean up authorization policies in the policy_audit table that have exceeded a specified number of days.
  • Disable users who have not logged in for a specified number of days.

Taking into account the shortcomings of the job system mentioned earlier, the following features need to be considered when designing the job system:

  • Distributed job system that ensures only one instance is working at any given time when there are multiple instances.
  • Close integration with the project, making it easy to reuse packages, functions, and other capabilities provided by the project to improve development efficiency.
  • Ability to execute tasks of the following types: scheduled tasks, interval-based tasks, and periodically scheduled tasks.
  • Ability to add new periodic/scheduled tasks in a modular way.

Implementation of IAM Job System #

After introducing the two Go packages used in the IAM job system and the features of the IAM job system, I will now explain the implementation of the IAM job system.

The IAM job system service is called iam-watcher. As the name implies, a watcher is responsible for observing certain states and executing corresponding tasks, hence the name watcher. The main function of iam-watcher is located in the cmd/iam-watcher/watcher.go file. The application framework is highly consistent with iam-apiserver, iam-authz-server, and iam-pump, so I will not introduce it again here.

The core implementation of the entire iam-watcher service is located in the internal/watcher/server.go file. In the server.go file, newWatchJob is called to create an instance of type github.com/robfig/cron.Cron, a cron instance. The code for newWatchJob is as follows:

func newWatchJob(redisOptions *genericoptions.RedisOptions, watcherOptions *options.WatcherOptions) *watchJob {
    logger := cronlog.NewLogger(log.SugaredLogger())

    client := goredislib.NewClient(&goredislib.Options{
        Addr:     fmt.Sprintf("%s:%d", redisOptions.Host, redisOptions.Port),
        Username: redisOptions.Username,
        Password: redisOptions.Password,    
    })

    pool := goredis.NewPool(client)
    rs := redsync.New(pool)

    cron := cron.New(
        cron.WithSeconds(),
        cron.WithChain(cron.SkipIfStillRunning(logger), cron.Recover(logger)),
    )

    return &watchJob{
        Cron:   cron,
        config: watcherOptions,
        rs:     rs,
    }
}

The above code creates instances of the following two types:

  • github.com/robfig/cron.Cron: A job system implemented based on the github.com/robfig/cron package, which supports three types of tasks: cron tasks, interval tasks, and interval cron tasks.
  • github.com/go-redsync/redsync.Redsync: A distributed mutual exclusion lock based on Redis.

It’s worth noting that when creating the cron instance, the cron.SkipIfStillRunning() option needs to be added. This option allows the cron task to skip the execution of the next task while the previous task is still running, thereby achieving the effect of interval tasks.

After creating the instances, the cron tasks are registered through addWatchers(). The code for addWatchers function is as follows:

func (w *watchJob) addWatchers() *watchJob {
    for name, watcher := range watcher.ListWatchers() {
        // log with `{"watcher": "counter"}` key-value to distinguish which watcher the log comes from.
        ctx := context.WithValue(context.Background(), log.KeyWatcherName, name)

        if err := watcher.Init(ctx, w.rs.NewMutex(name, redsync.WithExpiry(2*time.Hour)), w.config); err != nil {
            log.Panicf("construct watcher %s failed: %s", name, err.Error())
        }

        _, _ = w.AddJob(watcher.Spec(), watcher)
    }

    return w
}

The above function calls watcher.ListWatchers() to list all the watchers and adds these watchers to the cron scheduling engine in a for loop. The watcher is defined as follows:

type IWatcher interface {
    Init(ctx context.Context, rs *redsync.Mutex, config interface{}) error
    Spec() string
    cron.Job
}

type Job interface {
    Run()
}

In other words, a watcher is a struct that implements the following three methods:

  • Init(), used to initialize the watcher.
  • Spec(), used to return the time format of the Cron instance, supporting both Linux crontab time format and the time format like @every 1d.
  • Run(), used to run the task.

IAM implements two watchers:

  • task: Disables users who have not logged in for more than X days, where X can be configured by the watcher.task.max-inactive-days configuration item in the iam-watcher.yaml configuration file.
  • clean: Deletes authorization policies from the policy_audit table that have exceeded X days, where X can be configured by the watcher.clean.max-reserve-days configuration item in the iam-watcher.yaml configuration file.

After creating the cron instance, we can start the cron tasks in the Run function. The code for the Run function is as follows:

func (s preparedWatcherServer) Run() error {
    stopCh := make(chan struct{})
    s.gs.AddShutdownCallback(shutdown.ShutdownFunc(func(string) error {
        // wait for running jobs to complete.
        ctx := s.cron.Stop()
        select {
        case <-ctx.Done():
            log.Info("cron jobs stopped.")
        case <-time.After(3 * time.Minute):
            log.Error("context was not done after 3 minutes.")
        }
        stopCh <- struct{}{}

        return nil
    }))

    // start shutdown managers
    if err := s.gs.Start(); err != nil {
        log.Fatalf("start shutdown manager failed: %s", err.Error())
    }

    log.Info("star to run cron jobs.")
    s.cron.Start()

    // blocking here via channel to prevents the process exit.
    <-stopCh

    return nil
}

The above code starts the cron instance by calling s.cron.Start(), which executes the cron tasks.

It’s important to note that we also need to implement graceful shutdown functionality, which means waiting for running jobs to complete before terminating the process when the program ends. s.cron.Stop() returns a variable of type context.Context to inform the caller when the cron tasks will end so that the caller can terminate the process. After all cron tasks are completed or after a timeout of 3 minutes, a message is written to the stopCh channel, and <-stopCh ends the blocking state, thereby exiting the iam-watcher process.

Implementation of taskWatcher #

The implementation of taskWatcher is located in the internal/watcher/watcher/task/watcher.go file. This file defines a taskWatcher struct:

type taskWatcher struct {    
    ctx             context.Context    
    mutex           *redsync.Mutex    
    maxInactiveDays int          
}

taskWatcher implements the IWatcher interface. During program startup, the init function registers taskWatcher to the global variable registry defined in internal/watcher/watcher/registry.go. The func ListWatchers() map[string]IWatcher function then returns all registered watchers.

It is important to note that all watchers are imported as anonymous packages in the internal/watcher/watcher/all/all.go file, which triggers the execution of the init function in the package where the watcher is located. The init function registers the watcher to the registry variable by calling watcher.Register("clean", &cleanWatcher{}). The code for importing anonymous packages in all.go is as follows:

import (                                                           
    _ "github.com/marmotedu/iam/internal/watcher/watcher/clean"    
    _ "github.com/marmotedu/iam/internal/watcher/watcher/task"    
)

This approach allows us to add a new watcher as a plugin without modifying any of the iam-watcher framework code. By making minimal changes, we can add a new watcher without altering the core code of iam-watcher. For example, if we need to add a new cleansecret watcher, we can follow these two steps:

  1. Create a new cleansecret directory under internal/watcher/watcher and implement the cleanSecretWatcher.
  2. Import the github.com/marmotedu/iam/internal/watcher/watcher/cleansecret package as an anonymous package in the internal/watcher/watcher/all/all.go file.

In the Run() method of taskWatcher, we use the following code to ensure that only one task watcher is running even if there are multiple iam-watcher instances:

if err := tw.mutex.Lock(); err != nil {               
    log.L(tw.ctx).Info("taskWatcher already run.")    
    
    return    
}                 
defer func() {                                      
    if _, err := tw.mutex.Unlock(); err != nil {    
        log.L(tw.ctx).Errorf("could not release taskWatcher lock. err: %v", err)    
    
        return    
    }    
}()

In the Run() method of taskWatcher, we query all users and compare the loginedAt field with the current time to determine whether to disable the user. The loginedAt field records the last login time of the user.

From the implementation of the task watcher, we can see that we use the mysql.GetMySQLFactoryOr function provided by the IAM project, the log package, and the Options configuration. This allows us to conveniently develop a task that is closely related to the project.

Summary #

In Go project development, we often need to execute interval/timing tasks, and for this, we need a job system. We can use the crontab provided by Linux to execute timing tasks, or we can build our own job system and execute interval/timing tasks on it. However, these methods have some disadvantages, such as being independent of the project and unable to execute interval tasks. Therefore, a better approach is to use excellent open-source cron packages to implement a job system and develop task plugins based on this job system.

IAM, based on the github.com/robfig/cron package and the github.com/go-redsync/redsync package, has implemented its own distributed job system called iam-watcher. IAM-watcher allows the addition of timing tasks, interval tasks, and interval timing tasks in a plugin-based manner. For its specific implementation, you can refer to the code of the iam-watcher service. Its main function is located in the cmd/iam-watcher/watcher.go file.

Exercise #

  1. Take a moment to think: besides scheduled tasks, interval tasks, and interval scheduled tasks, are there any other types of task requirements in daily work? Feel free to share in the comments.
  2. Try implementing a new watcher to delete expired secrets from the secret table. - Welcome to discuss and communicate with me in the comments. If this lesson is helpful to you, feel free to share it with your friends around you.