11 How to Avoid Duplicate Execution in Distributed Scheduled Tasks Under Multiple Instances

11 How to Avoid Duplicate Execution in Distributed Scheduled Tasks Under Multiple Instances #

In the previous chapters, users registered as members by binding their mobile phone numbers and could supplement their personal information, such as their name and birthday. After obtaining the user’s birthday information, marketing can be conducted using the member’s birthday information. This involves the issue of executing scheduled tasks for marketing information push. This article will guide you through the construction of scheduled tasks in a microservices environment.

Selection of Scheduled Task Frameworks #

There are several common solutions for scheduled tasks:

img

The right side of the diagram, based on Java or the Spring framework, can support the development and execution of scheduled tasks. The left side requires the introduction of third-party frameworks. Let’s briefly introduce each solution:

  • XXL-JOB is a distributed task scheduling platform. Its core design goals are rapid development, easy learning, lightweight, and easy to expand. Task scheduling and task execution are separated, and it has rich functionality. It has been applied in commercial products of many companies. Official website: https://www.xuxueli.com/xxl-job/
  • Elastic-Job is a distributed scheduling solution composed of two independent sub-projects: Elastic-Job-Lite and Elastic-Job-Cloud. Elastic-Job-Lite is positioned as a lightweight, decentralized solution that relies on ZooKeeper. It provides distributed task coordination services in the form of jar packages. It used to be part of the ddframe framework, a Java application framework of Dangdang.com, and then split out for independent development.
  • Quartz is a well-known framework in the field of scheduled tasks. It is developed by the OpenSymphony open-source organization and written entirely in Java. It provides two ways to store jobs: in-memory job storage and database job storage. When it comes to distributed task scheduling, the database job storage ensures that task information is not lost when the server is shut down or restarted, and it has good availability in a cluster environment.
  • TBSchedule, produced by Taobao, is a simple distributed task scheduling engine. It is a pure Java implementation based on ZooKeeper. Task scheduling and task execution are separated. The scheduling side can control and monitor the execution status of tasks. It allows tasks to be dynamically assigned and executed in different thread groups in JVMs on multiple hosts to ensure that tasks are executed without duplication or omission.
  • Timer and TimerTask are two classes in the Java basic component library. They are suitable for simple tasks, but when it comes to complex tasks, it is recommended to choose other solutions.
  • ScheduledExecutorService adds the functionality of delayed and periodic task execution on top of what ExecutorService provides. Although it has the functionality of scheduled execution, it is often not chosen as the scheduling solution.
  • [@EnableScheduling] is used to enable scheduled tasks in the form of annotations. It relies on the Spring framework and is easy to use without requiring XML configuration. It is especially convenient when using the Spring Boot framework.

Introducing third-party distributed frameworks will increase the complexity of the project. Timer and TimerTask are relatively simple and cannot meet the needs of complex distributed scheduled tasks. This time, we choose to use [@EnableScheduling], an annotation-based approach, to start our journey of scheduled tasks.

Creating a Scheduled Task Project #

In the parking-project parent project, create a new Spring Boot-based scheduled task project named parking-schedule-job. Set up the basic project configuration, such as the port number and project name.

Add the project’s startup class:

@SpringBootApplication
@EnableScheduling
public class ParkingScheduleJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(ParkingScheduleJobApplication.class, args);
    }

}

Add the task execution class:

@Component
@Slf4j
public class UserBirthdayBasedPushTask {

    // Outputs a log every 5 seconds
    @Scheduled(cron = "0/5 * * * * ?")
    public void scheduledTask() {

        log.info("Task running at = "  + LocalDateTime.now());
    }
}

A simple scheduled task project is now complete. Start the project, and the log will be output every 5 seconds. It works fine for a single instance execution, but upon closer inspection, it seems to be inconsistent with our expectations. In a microservices architecture environment, when multiple instances are horizontally scaled and deployed, each instance will execute the task every 5 seconds. Repetitive execution can lead to data chaos or a poor user experience. For example, in this case of sending marketing messages based on member birthdays, users will be bombarded with messages, which is not what we want to see. Even if multiple code instances are deployed, the task should be executed only once at the same time. The number of executions should not increase just because the number of instances increases.

Distributed Scheduled Tasks #

To ensure that tasks are executed only once at the same time, each instance needs to obtain a token before execution. Only the instance with the token can execute the task, and others without the token cannot execute the task. This can be achieved by recording information in the database.

img There is a loophole in the A solution provided by some friends: when selecting specific records and then updating them, there is a time gap which can lead to multiple instances executing tasks simultaneously. It is recommended to use solution B which is more reliable - when updating a record, the update operation will return 1 if successful, otherwise it will return 0.

To implement this solution, you need to write data update operations. But if you don’t want to write all this code, is there any good solution? Of course, there will always be “lazy” programmers to help you out. Let me introduce a component called ShedLock, which allows your scheduled tasks to be executed at most once at the same time.

  1. Import the ShedLock related jars. Here we will continue to use MySQL as the database:
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-core</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-spring</artifactId>
    <version>4.5.0</version>
</dependency>
<dependency>
    <groupId>net.javacrumbs.shedlock</groupId>
    <artifactId>shedlock-provider-jdbc-template</artifactId>
    <version>4.5.0</version>
</dependency>
  1. Modify the project’s startup class and add the @EnableSchedulerLock annotation to enable ShedLock’s lock acquisition support.
@SpringBootApplication
@EnableScheduling
@EnableSchedulerLock(defaultLockAtMostFor = "30s")
public class ParkingScheduleJobApplication {

    public static void main(String[] args) {
        SpringApplication.run(ParkingScheduleJobApplication.class, args);
    }

    @Bean
    // Provide lock mechanism based on Jdbc
    public LockProvider lockProvider(DataSource dataSource) {
        return new JdbcTemplateLockProvider(dataSource);
    }

}
  1. Add the @SchedulerLock annotation to the methods in the task execution class and declare the name of the scheduled task lock. If there are multiple scheduled tasks, ensure the uniqueness of the name.

  2. Create a database named “shedlock” and create a table named “shedlock” with the following structure:

CREATE TABLE shedlock(
      `NAME` varchar(64) NOT NULL DEFAULT '' COMMENT 'Task name',
      `lock_until` timestamp(3) NULL DEFAULT NULL COMMENT 'Release time',
      `locked_at` timestamp(3) NULL DEFAULT NULL COMMENT 'Lock time',
      `locked_by` varchar(255) DEFAULT NULL COMMENT 'Locked instance',
    PRIMARY KEY (name)
)
  1. Modify the database connection in application.properties:
spring.datasource.driverClassName = com.mysql.cj.jdbc.Driver
spring.datasource.url = jdbc:mysql://localhost:3306/shedlock?useUnicode=true&characterEncoding=utf-8
spring.datasource.username = root
spring.datasource.password = root
  1. After completing the above steps, the basic configuration is finished. Let’s test whether only one instance is executing the task at the same time when running in multiple instances.
// Instance 1 log output
2020-03-07 21:20:45.007 INFO 67479 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:20:45.007
2020-03-07 21:20:50.011 INFO 67479 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:20:50.011
2020-03-07 21:21:15.009 INFO 67479 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:15.009
2020-03-07 21:21:30.014 INFO 67479 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:30.014
2020-03-07 21:21:40.008 INFO 67479 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:40.008

// Instance 2 log output
2020-03-07 21:21:20.011 INFO 67476 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:20.011
2020-03-07 21:21:25.008 INFO 67476 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:25.008
2020-03-07 21:21:30.006 INFO 67476 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:30.006
2020-03-07 21:21:35.006 INFO 67476 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:35.006
2020-03-07 21:21:45.008 INFO 67476 --- [scheduling-1] c.m.p.s.j.t.UserBirthdayBasedPushTask : Task running at = 2020-03-07T21:21:45.008

As you can see, the task is executed every 5 seconds and is distributed between two instances. Only one task is executed at the same time, which is consistent with our expectations. The database table records (in the case of two scheduled tasks) are shown in the image below:

img

Scheduled Marketing SMS #

Now that we have built the initial framework, let’s fill in the functionality to send marketing SMS messages based on member’s birthday.

Some people might think about running the task during a low-traffic period, like the early morning. But is it really a good idea to send marketing SMS messages to users during the early morning? It’s important to consider the user experience and not blindly schedule the task.

We have learned how to call services in the previous chapter on service invocation. This time, the scheduled task project needs to call a method in the member service, and we will do so using Feign. Let’s create the MemberServiceClient interface, which should have the same request-response structure as the member service.

@FeignClient(value = "member-service", fallback = MemberServiceFallback.class)
public interface MemberServiceClient {

    @RequestMapping(value = "/member/list", method = RequestMethod.POST)
    public CommonResult<List<Member>> list() throws BusinessException;

    @RequestMapping(value = "/member/getMember", method = RequestMethod.POST)
    public CommonResult<Member> getMemberInfo(@RequestParam(value = "memberId") String memberId);

}

Next, let’s implement the business logic in the task execution class. We will need the Member entity in this class, but this entity is maintained in the member service and is not publicly exposed. *For some common classes, they can be extracted into a shared project for mutual reference between different projects, instead of maintaining multiple copies.*

@Component
@Slf4j
public class UserBirthdayBasedPushTask {

    @Autowired
    MemberServiceClient memberService;

    @Scheduled(cron = "0/5 * * * * ?")
    @SchedulerLock(name = "scheduledTaskName")
    public void scheduledTask() {
        try {
            String members = memberService.list();
            List<Member> array  = JSONArray.parseArray(members, Member.class);

            DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
            LocalDateTime time = LocalDateTime.now();
            String curTime = df.format(time);
            for (Member one : array) {
                // Send marketing SMS to members whose birthday is today
                if (curTime.equals(one.getBirth())) {
                    log.info("Send SMS to " + one.getPhone() );
                }
            }
        } catch (BusinessException e) {
            log.error("Catch exception " + e.getMessage());
        }

        log.info("Task running at "  + LocalDateTime.now());
    }
}

Start the member service and scheduled task projects and test if the business logic is working properly. When the scheduled task is executed, an exception occurs:

Caused by: org.springframework.http.converter.HttpMessageNotReadableException: JSON parse error: Cannot deserialize instance of `com.mall.parking.common.bean.CommonResult` out of START_ARRAY token; nested exception is com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot deserialize instance of `com.mall.parking.common.bean.CommonResult` out of START_ARRAY token at [Source: (PushbackInputStream); line: 1, column: 1]

The cause of the issue is that the CommonResult object contains a list of Member objects, and the JSON object should have the structure {}, but the returned value is [], which causes a parsing exception. We need to change the Feign interface to return the raw JSON string.

// Change the method in the MemberServiceClient interface to this
@RequestMapping(value = "/member/list", method = RequestMethod.POST)
public String list() throws BusinessException;

Change the way we handle the response in the task execution class, as shown below.

@Scheduled(cron = "0/5 * * * * ?")
@SchedulerLock(name = "scheduledTaskName")
public void scheduledTask() {
    try {
        String members = memberService.list();
        List<Member> array  = JSONArray.parseArray(members, Member.class);

        DateTimeFormatter df = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
        LocalDateTime time = LocalDateTime.now();
        String curTime = df.format(time);
        for (Member one : array) {
            // Send marketing SMS to members whose birthday is today
            if (curTime.equals(one.getBirth())) {
                log.info("Send SMS to " + one.getPhone());
            }
        }
    } catch (BusinessException e) {
        log.error("Catch exception " + e.getMessage());
    }

    log.info("Task running at "  + LocalDateTime.now());
}

Restart the two projects and test if the task is executing correctly. If you need more scheduled tasks in your project, you can refer to this approach and write the corresponding code.

In this chapter, we discussed several solutions for scheduled tasks, and then introduced distributed scheduled tasks to complete our marketing SMS task. Here’s a challenge for you: If you were to use the elastic-job component, how would you implement distributed scheduled tasks?