25 Asynchronous Processing Useful but Very Easy to Misuse

25 Asynchronous Processing Useful but Very Easy to Misuse #

Today, I will talk to you about asynchronous processing, which is powerful but also easy to misuse.

Asynchronous processing is an essential architectural pattern for internet applications. Most business projects are implemented using a combination of synchronous processing, asynchronous processing, and scheduled task processing.

Unlike synchronous processing, asynchronous processing does not require waiting for the process to finish. Therefore, it is suitable for the following scenarios:

  1. Branch processes that serve the main process. For example, in the registration process, writing data to the database is the main process, while sending coupons or welcome messages to users after registration is a branch process with less time sensitivity, which can be done asynchronously.

  2. Processes where users don’t need to see the results in real-time. For example, the order fulfillment and delivery processes after placing an order can be done asynchronously. After each stage is completed, notifications can be sent to users through push notifications or text messages.

In addition, asynchronous processing has functional advantages over synchronous processing in handling traffic spikes, achieving module decoupling, and message broadcasting, thanks to the introduction of message queue (MQ) middleware for task buffering and distribution.

However, despite being powerful, asynchronous processing has three common mistakes that are easy to make: reliability issues in the asynchronous processing workflow, confusion in message sending modes, and the problem of a large number of dead messages congesting the queue. Today, I will discuss these mistakes using three code examples and the popular MQ system RabbitMQ.

For the demonstrations in today’s lecture, I will be using Spring AMQP to operate RabbitMQ. Therefore, you need to first import the amqp dependency:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

Asynchronous Processing Requires Message Compensatory Loop #

Using MQ systems like RabbitMQ and RocketMQ to implement message queues for asynchronous processing is a common practice. Messages can be stored on disk, ensuring that even if the MQ system encounters problems, the message data will not be lost. However, in the process of asynchronous flow, such as message sending, transmission, and processing, message loss can still occur. Additionally, no MQ middleware can guarantee 100% availability, so consideration must be given to how the asynchronous process will continue when the MQ system is unavailable.

Therefore, for asynchronous processing, it is necessary to consider compensation or establish a primary-secondary dual active process.

Let’s take a look at a scenario where a welcome message is asynchronously sent after a user registers. The process of registering a user to the database is a synchronous process, while the process of the member service sending the welcome message upon receiving the message is an asynchronous process.

img

Let’s analyze this:

The blue line represents the asynchronous processing done using MQ, which we call the primary line. There may be a possibility of message loss (the dotted line represents asynchronous invocation).

The green line represents the compensation job that is periodically triggered to compensate for lost messages. We call this the secondary line, which is used to compensate for lost messages from the primary line.

Considering the extreme case of MQ middleware failure, we require that the secondary line has the same processing throughput as the primary line.

Now let’s take a look at the relevant implementation code.

Firstly, define the UserController to handle registration and send asynchronous messages. For the registration method, we register 10 users at once, and there is a 50% chance that the user registration message fails to be sent.

@RestController
@Slf4j
@RequestMapping("user")
public class UserController {

    @Autowired
    private UserService userService;

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping("register")
    public void register() {

        // Simulate registration of 10 users
        IntStream.rangeClosed(1, 10).forEach(i -> {

            // Register user to database
            User user = userService.register();

            // Simulate a 50% chance of message sending failure
            if (ThreadLocalRandom.current().nextInt(10) % 2 == 0) {

                // Send message via RabbitMQ
                rabbitTemplate.convertAndSend(RabbitConfiguration.EXCHANGE, RabbitConfiguration.ROUTING_KEY, user);

                log.info("Sent MQ user {}", user.getId());
            }
        });
    }
}

Next, define the MemberService class to simulate the member service. The member service listens to messages indicating successful user registration and sends welcome SMS messages. We use a ConcurrentHashMap to store the user IDs of the users who have already received the welcome message, ensuring idempotence and avoiding sending duplicate messages when compensating for the same user.

@Component
@Slf4j
public class MemberService {

    // Status of sending welcome messages
    private Map<Long, Boolean> welcomeStatus = new ConcurrentHashMap<>();

    // Listen for messages indicating successful user registration and send welcome messages
    @RabbitListener(queues = RabbitConfiguration.QUEUE)
    public void listen(User user) {
        log.info("Received MQ user {}", user.getId());
        welcome(user);
    }

    // Send welcome message
    public void welcome(User user) {

        // Deduplication
        if (welcomeStatus.putIfAbsent(user.getId(), true) == null) {
            try {
                TimeUnit.SECONDS.sleep(2);
            } catch (InterruptedException e) {
                // handle exception
            }
            log.info("MemberService: Welcome new user {}", user.getId());
        }
    }
}

When implementing an MQ consumer program, it is important to include deduplication logic (supporting idempotence) for several reasons:

  1. MQ messages may be duplicated due to middleware misconfiguration or instability.
  2. Automated compensation may result in duplicates. In this example, the same message may be delivered both via MQ and through compensation. Duplicates are expected, and considering high cohesion, the compensation job itself does not handle deduplication.
  3. Manual compensation for duplicates. When there is a message backlog, the asynchronous processing flow will inevitably be delayed. If we provide a functionality for compensation through the backend when encountering delays, it is highly likely that manual compensation will be performed first, and then after some time, the processing program will receive the message again and process it repeatedly. I have personally encountered an incident caused by MQ failure, where there was a backlog of hundreds of thousands of messages for fund disbursement, causing business operations to be delayed. The operations team initially thought there was a program error and performed manual processing through the backend. However, when the MQ system recovered, the messages were duplicated and processed again, resulting in a large amount of duplicate fund disbursement. Next, let’s define the compensation job, which is the backup line operation.

In the CompensationJob class, we define a @Scheduled task that executes the compensation operation every 5 seconds. Since the job doesn’t know which user registration messages may have been lost, we need to perform full compensation. The compensation logic is as follows: every 5 seconds, compensate 5 users in sequence, starting from the ID of the last compensated user in the previous round. To improve processing capabilities, we submit the compensation task to a thread pool for asynchronous processing.

@Component
@Slf4j
public class CompensationJob {

    // Asynchronous processing thread pool for compensation job
    private static ThreadPoolExecutor compensationThreadPool = new ThreadPoolExecutor(
            10, 10,
            1, TimeUnit.HOURS,
            new ArrayBlockingQueue<>(1000),
            new ThreadFactoryBuilder().setNameFormat("compensation-threadpool-%d").get()
    );

    @Autowired
    private UserService userService;

    @Autowired
    private MemberService memberService;

    // The offset that indicates the current compensation progress
    private long offset = 0;

    // Compensation starts 10 seconds later, and then runs every 5 seconds
    @Scheduled(initialDelay = 10_000, fixedRate = 5_000)
    public void compensationJob() {
        log.info("Start compensating from user ID {}", offset);

        // Get users starting from the offset
        userService.getUsersAfterIdWithLimit(offset, 5).forEach(user -> {
            compensationThreadPool.execute(() -> memberService.welcome(user));
            offset = user.getId();
        });
    }
}

To achieve high cohesion in processing both main and backup line messages, it is best to use the same method. In this example, the MemberService handles both the MQ message and the compensation, and it calls the welcome method for both cases.

In addition, it is worth mentioning that the compensation logic in this demo is relatively simple. In a production-level code, the following aspects should be strengthened:

  • Consider configuring appropriate values for the compensation frequency, the number of users processed in each round, and the size of the compensation thread pool to meet the throughput requirements.
  • Consider introducing appropriate delay for compensating backup line data. For example, only compensate users whose registration time is earlier than 30 seconds, so that the compensation can be scheduled to avoid conflict with the real-time main line MQ processing.
  • The offset data that indicates the current compensation progress needs to be persisted in a database.
  • The compensation job itself needs to be highly available, and a task system like XXLJob or ElasticJob can be used.

When running the program and registering 10 users using the registration method, the following output is shown:

[17:01:16.570] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 1
[17:01:16.571] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 5
[17:01:16.572] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 7
[17:01:16.573] [http-nio-45678-exec-1] [INFO ] [o.g.t.c.a.compensation.UserController:28  ] - sent mq user 8
[17:01:16.594] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 1
[17:01:18.597] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 1
[17:01:18.601] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 5
[17:01:20.603] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 5
[17:01:20.604] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 7
[17:01:22.605] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 7
[17:01:22.606] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:18  ] - receive mq user 8
[17:01:24.611] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 8
[17:01:25.498] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - Start compensating from user ID 0
[17:01:27.510] [compensation-threadpool-1] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 2
[17:01:27.510] [compensation-threadpool-3] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 4
[17:01:27.511] [compensation-threadpool-2] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 3
[17:01:30.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - Start compensating from user ID 5
[17:01:32.500] [compensation-threadpool-6] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 6
[17:01:32.500] [compensation-threadpool-9] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 9
[17:01:35.496] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - Start compensating from user ID 9
[17:01:37.501] [compensation-threadpool-0] [INFO ] [o.g.t.c.a.compensation.MemberService:28  ] - memberService: welcome new user 10
[17:01:40.495] [scheduling-1] [INFO ] [o.g.t.c.a.compensation.CompensationJob:29  ] - Start compensating from user ID 10

From the output, we can observe the following:

  • Out of the 10 registered users, only four users were sent successfully via MQ: users 1, 5, 7, and 8.
  • The compensation task ran three times. In the first run, users 2, 3, and 4 were compensated. In the second run, users 6 and 9 were compensated. In the third run, user 10 was compensated.
  • It is worth noting that the ultimate goal of a complete compensation mechanism for message processing is to achieve the throughput of compensating for all the data. This means that even if the MQ is shut down directly, although the timeliness of processing may be slightly affected, at least the entire process can still run smoothly.

Pay attention to whether the message mode is broadcast or work queue #

At the beginning of today’s lecture, we mentioned an important advantage of asynchronous processing, which is the implementation of message broadcasting.

Message broadcasting, similar to what we usually call “broadcasting,” means that the same message can be consumed by different consumers separately. On the other hand, the queue mode means that different consumers share the consumption of the same queue data, and the same message can only be consumed once by a consumer.

For example, for the registration message of the same user, the membership service needs to listen and send a welcome message, and the marketing service also needs to listen and send a small gift to new users. However, both the membership service and the marketing service may have multiple instances, and we expect the same user message to be broadcast to different services simultaneously (broadcast mode), but for different instances of the same service (e.g., Membership Service 1 and Membership Service 2), it only needs to be processed once (work queue mode):

img

When implementing the code, we must confirm the mechanism of the MQ system to ensure that the message routing meets our expectations.

For MQ systems like RocketMQ, implementing similar functionality is relatively straightforward: if consumers belong to the same group, then messages will only be consumed by one consumer in the same group; if consumers belong to different groups, then each group can consume the messages once.

For RabbitMQ, on the other hand, the message routing mode uses queues + exchanges. Queues are used as message carriers, and exchanges determine how messages are routed to queues. The configuration is more complex and prone to errors. Therefore, next, I will focus on explaining the related code implementation of RabbitMQ.

Let’s take the above architecture diagram as an example to demonstrate the pitfalls of using RabbitMQ to implement broadcasting mode and work queue mode.

The first step is to implement the logic for the membership service to listen to the new user registration message sent by the user service.

If we start two membership services, then the registration message of the same user should only be consumed by one of the instances.

We implement the RabbitMQ queue, exchange, and binding. Among them, the queue used is an anonymous queue, and the exchange used is a direct exchange called DirectExchange. The routing key for binding the exchange to the anonymous queue is an empty string. After receiving the message, we will print the port used by the instance:

//For code simplicity and clarity, we put the message publisher, consumer, and MQ configuration code together

@Slf4j
@Configuration
@RestController
@RequestMapping("workqueuewrong")
public class WorkQueueWrong {

    private static final String EXCHANGE = "newuserExchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping
    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
    }

    //Use an anonymous queue as the message queue
    @Bean
    public Queue queue() {
        return new AnonymousQueue();
    }

    //Declare DirectExchange exchange and bind queue to the exchange
    @Bean
    public Declarables declarables() {
        DirectExchange exchange = new DirectExchange(EXCHANGE);
        return new Declarables(queue(), exchange,
                BindingBuilder.bind(queue()).to(exchange).with(""));
    }

    //Listen to the queue, and the queue name is referenced through SpEL expression with Bean
    @RabbitListener(queues = "#{queue.name}")
    public void memberService(String userName) {
        log.info("memberService: welcome message sent to new user {} from {}", userName, System.getProperty("server.port"));
    }
}

After starting two program instances on ports 12345 and 45678, calling the sendMessage interface to send a message, the output log shows that both instances of the membership service received the message:

img

img

The reason for this problem is that we didn’t clarify the binding relationship between the direct exchange of RabbitMQ and the queue.

As shown in the figure below, RabbitMQ’s direct exchange routes messages according to the routing key. Since our program creates an anonymous (randomly named) queue each time it starts, each instance of the membership service corresponds to an independent queue, which is bound to the direct exchange with an empty routing key. When the user service sends a message and sets the routing key to empty, the direct exchange receives the message and finds two matching queues, so it forwards the message to both:

img

To fix this problem, it is actually very simple. For the membership service, instead of using an anonymous queue, we can use the same queue. Replace the anonymous queue in the above code with a regular queue:

private static final String QUEUE = "newuserQueue";

@Bean
public Queue queue() {
    return new Queue(QUEUE);
}

Testing shows that for the same message, only one of the two instances can receive it, and different messages are distributed to different instances in round-robin. Now the relationship between the exchange and the queue is as follows:

img

The second step is to further implement the logic where the user service needs to broadcast messages to the membership service and marketing service.

We hope that both the membership service and the marketing service can receive the broadcast message, but each instance of the membership service or marketing service only needs to receive the message once.

The code is as follows. We declare a queue and a fanout exchange, and then simulate two user services and two marketing services:

@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutwrong")
public class FanoutQueueWrong {

    private static final String QUEUE = "newuser";
    private static final String EXCHANGE = "newuser";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping
    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
    }

    //Declare FanoutExchange and bind to the queue. FanoutExchange does not need routingKey when binding to a queue
    @Bean
    public Declarables declarables() {
        Queue queue = new Queue(QUEUE);
        FanoutExchange exchange = new FanoutExchange(EXCHANGE);
return new Declarables(queue, exchange,
    
                    BindingBuilder.bind(queue).to(exchange));
@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutright")
public class FanoutQueueRight {

    private static final String MEMBER_QUEUE = "newusermember";

    private static final String PROMOTION_QUEUE = "newuserpromotion";

    private static final String EXCHANGE = "newuser";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @GetMapping
    public void sendMessage() {
        rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
    }

    @Bean
    public Declarables declarables() {
        Queue memberQueue = new Queue(MEMBER_QUEUE);
        Queue promotionQueue = new Queue(PROMOTION_QUEUE);
        FanoutExchange exchange = new FanoutExchange(EXCHANGE);
        return new Declarables(memberQueue, promotionQueue, exchange,
                BindingBuilder.bind(memberQueue).to(exchange),
                BindingBuilder.bind(promotionQueue).to(exchange));
    }

    @RabbitListener(queues = MEMBER_QUEUE)
    public void memberService1(String userName) {
        log.info("memberService1: welcome message sent to new user {}", userName);
    }

    @RabbitListener(queues = MEMBER_QUEUE)
    public void memberService2(String userName) {
        log.info("memberService2: welcome message sent to new user {}", userName);
    }

    @RabbitListener(queues = PROMOTION_QUEUE)
    public void promotionService1(String userName) {
        log.info("promotionService1: gift sent to new user {}", userName);
    }

    @RabbitListener(queues = PROMOTION_QUEUE)
    public void promotionService2(String userName) {
        log.info("promotionService2: gift sent to new user {}", userName);
    }
}

In the original code, we used a FanoutExchange but it did not work as expected. This is because a FanoutExchange ignores the routingKey and broadcasts messages to all bound queues. In this case, since both member services and promotion services are bound to the same queue, each message is consumed by either the member services or the promotion services, resulting in only one group of services receiving the messages.

To resolve this, we need to split the queues. Each group of services (member and promotion) should have their own individual queue bound to the FanoutExchange:

@Slf4j
@Configuration
@RestController
@RequestMapping("fanoutright")
public class FanoutQueueRight {

	private static final String MEMBER_QUEUE = "newusermember";
	private static final String PROMOTION_QUEUE = "newuserpromotion";
	private static final String EXCHANGE = "newuser";

	@Autowired
	private RabbitTemplate rabbitTemplate;

	@GetMapping
	public void sendMessage() {
		rabbitTemplate.convertAndSend(EXCHANGE, "", UUID.randomUUID().toString());
	}

	@Bean
	public Declarables declarables() {
		Queue memberQueue = new Queue(MEMBER_QUEUE);
		Queue promotionQueue = new Queue(PROMOTION_QUEUE);
		FanoutExchange exchange = new FanoutExchange(EXCHANGE);
		return new Declarables(memberQueue, promotionQueue, exchange,
				BindingBuilder.bind(memberQueue).to(exchange),
				BindingBuilder.bind(promotionQueue).to(exchange));
	}

	@RabbitListener(queues = MEMBER_QUEUE)
	public void memberService1(String userName) {
		log.info("memberService1: welcome message sent to new user {}", userName);
	}

	@RabbitListener(queues = MEMBER_QUEUE)
	public void memberService2(String userName) {
		log.info("memberService2: welcome message sent to new user {}", userName);
	}

	@RabbitListener(queues = PROMOTION_QUEUE)
	public void promotionService1(String userName) {
		log.info("promotionService1: gift sent to new user {}", userName);
	}

	@RabbitListener(queues = PROMOTION_QUEUE)
	public void promotionService2(String userName) {
		log.info("promotionService2: gift sent to new user {}", userName);
	}
}

Now, the structure of the exchange and queues is as follows:

Exchange: newuser
  Queue: newusermember
  Queue: newuserpromotion

From the log output, it is clear that each member and promotion service receives the message once. Each message is broadcasted to both services at the same time, but only one instance of each service receives the message at a time, due to round-robin distribution:

memberService1: welcome message sent to new user {userName}
memberService2: welcome message sent to new user {userName}
promotionService1: gift sent to new user {userName}
promotionService2: gift sent to new user {userName}

Therefore, with a clear understanding of how RabbitMQ direct exchanges and fanout exchanges work, we can write our message routing logic correctly.

Misconfiguring the message routing for asynchronous processes can lead to duplicate message processing or important services not receiving messages, resulting in logical errors in business operations.

Each message queue middleware has its own configuration for message routing, and it is crucial to understand the principles first before writing the code.

Don’t Let Dead Letters Clog the Message Queue #

We mentioned in the introduction of thread pools that if the task queue of the thread pool has no limit, it may eventually lead to OOM. When using a message queue to handle asynchronous processes, we also need to pay attention to the problem of task accumulation in the message queue. For the message queue accumulation caused by burst traffic, the problem is not big, and adjusting the consumer’s consumption capability appropriately should solve it. However, in many cases, the accumulation and blockage of the message queue is due to a large number of messages that cannot be processed.

For example, the user service sends a message after a user registers, and the member service listens to the message and sends coupons to the user. However, because the user failed to save successfully, the member service has always failed to process the message, and the message re-enters the queue, and then it still fails to be processed. This same message in the MQ that echoes like a ghost is a dead letter.

As more and more dead letters fill up the MQ, consumers need to spend a lot of time repeatedly processing dead letters, causing the consumption of normal messages to be blocked, and eventually the MQ may crash due to excessive data volume.

Let’s test this scenario. First, define a queue, a direct exchange, and bind the queue to the exchange:

@Bean
public Declarables declarables() {
    // Queue
    Queue queue = new Queue(Consts.QUEUE);
    // Exchange
    DirectExchange directExchange = new DirectExchange(Consts.EXCHANGE);
    // Quickly declare a group of objects, including the queue, the exchange, and the binding between the queue and the exchange
    return new Declarables(queue, directExchange, BindingBuilder.bind(queue).to(directExchange).with(Consts.ROUTING_KEY));
}

Next, implement a sendMessage method to send messages to the MQ. Each time you visit, send a message with an incrementing identifier as the content:

// Incrementing message identifier
AtomicLong atomicLong = new AtomicLong();

@Autowired
private RabbitTemplate rabbitTemplate;

@GetMapping("sendMessage")
public void sendMessage() {
    String msg = "msg" + atomicLong.incrementAndGet();
    log.info("send message {}", msg);
    // Send the message
    rabbitTemplate.convertAndSend(Consts.EXCHANGE, msg);
}

After receiving the message, throw a NullPointerException directly to simulate the error handling:

@RabbitListener(queues = Consts.QUEUE)
public void handler(String data) {
    log.info("got message {}", data);
    throw new NullPointerException("error");
}

Call the sendMessage interface to send two messages, and then go to the RabbitMQ management console. You can see that these two messages are always in the queue and are continuously redelivered, resulting in the redelivery QPS reaching 1063.

img

At the same time, a large number of exception messages can be seen in the log:

[20:02:31.533] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [WARN ] [o.s.a.r.l.ConditionalRejectingErrorHandler:129 ] - Execution of Rabbit message listener failed.
org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Listener method 'public void org.geekbang.time.commonmistakes.asyncprocess.deadletter.MQListener.handler(java.lang.String)' threw exception
  at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:219)
  at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandlerAndProcessResult(MessagingMessageListenerAdapter.java:143)
  at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.onMessage(MessagingMessageListenerAdapter.java:132)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1569)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1488)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1476)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1467)
  at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1411)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:958)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:908)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1279)
  at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1185)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.NullPointerException: error
  at org.geekbang.time.commonmistakes.asyncprocess.deadletter.MQListener.handler(MQListener.java:14)
  at sun.reflect.GeneratedMethodAccessor46.invoke(Unknown Source)
  at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
  at java.lang.reflect.Method.invoke(Method.java:498)
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.doInvoke(InvocableHandlerMethod.java:171)
  at org.springframework.messaging.handler.invocation.InvocableHandlerMethod.invoke(InvocableHandlerMethod.java:120)
  at org.springframework.amqp.rabbit.listener.adapter.HandlerAdapter.invoke(HandlerAdapter.java:50)
  at org.springframework.amqp.rabbit.listener.adapter.MessagingMessageListenerAdapter.invokeHandler(MessagingMessageListenerAdapter.java:211)
  ... 13 common frames omitted

The simplest way to solve the problem of dead letters repeatedly entering the queue is to throw an AmqpRejectAndDontRequeueException exception directly when the program encounters an error, to prevent the message from re-entering the queue:

throw new AmqpRejectAndDontRequeueException("error");

But, the logic we prefer is to retry several times for the same message before delivering it to a dead letter queue, in order to resolve occasional message processing failures due to network issues. If it still fails after those retries, then the message can be delivered to a dedicated dead letter queue. For data from the dead letter queue, we may only log it and send an alert. Even if an exception occurs, we will not attempt redelivery. The entire logic is shown in the following diagram:

img

To address this issue, Spring AMQP provides a very convenient solution:

First, define a dead letter exchange and dead letter queue. Actually, they are just normal exchanges and queues, but we use them specifically for handling dead letter messages.

Then, build a RetryOperationsInterceptor using RetryInterceptorBuilder to handle retries on failures. In this case, the strategy is to try a maximum of 5 times (4 retries), with exponential backoff retry: the first retry delays for 1 second, the second retry delays for 2 seconds, and so on, with a maximum delay of 10 seconds. If the 4th retry still fails, the message is republished to a “dead letter exchange” using RepublishMessageRecoverer.

Finally, define a handler for the dead letter queue. In this example, we simply log the message.

The corresponding implementation code is as follows:

// Define the dead letter exchange and queue, and bind them

@Bean
public Declarables declarablesForDead() {
    Queue queue = new Queue(Consts.DEAD_QUEUE);
    DirectExchange directExchange = new DirectExchange(Consts.DEAD_EXCHANGE);
    return new Declarables(queue, directExchange,
            BindingBuilder.bind(queue).to(directExchange).with(Consts.DEAD_ROUTING_KEY));
}

// Define the retry operations interceptor

@Bean
public RetryOperationsInterceptor interceptor() {
    return RetryInterceptorBuilder.stateless()
            .maxAttempts(5) // try (not retry) a maximum of 5 times
            .backOffOptions(1000, 2.0, 10000) // exponential backoff retry
            .recoverer(new RepublishMessageRecoverer(rabbitTemplate, Consts.DEAD_EXCHANGE, Consts.DEAD_ROUTING_KEY)) // republish messages that reach the retry limit
            .build();
}

// Enable the retry interceptor by setting the adviceChain property of SimpleRabbitListenerContainerFactory to the previously defined RetryOperationsInterceptor

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(interceptor());
    return factory;
}

// Dead letter queue handler

@RabbitListener(queues = Consts.DEAD_QUEUE)
public void deadHandler(String data) {
    log.error("got dead message {}", data);
}

When running the program and sending two messages, the output log is as follows:

[11:22:02.193] [http-nio-45688-exec-1] [INFO ] [o.g.t.c.a.d.DeadLetterController:24  ] - send message msg1
[11:22:02.219] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg1
[11:22:02.614] [http-nio-45688-exec-2] [INFO ] [o.g.t.c.a.d.DeadLetterController:24  ] - send message msg2
[11:22:03.220] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg1
[11:22:05.221] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg1
[11:22:09.223] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg1
[11:22:17.224] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg1
[11:22:17.226] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [WARN ] [o.s.a.r.retry.RepublishMessageRecoverer:172 ] - Republishing failed message to exchange 'deadtest' with routing key deadtest
[11:22:17.227] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg2
[11:22:17.229] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [ERROR] [o.g.t.c.a.deadletter.MQListener:20  ] - got dead message msg1
[11:22:18.232] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg2
[11:22:20.237] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg2
[11:22:24.241] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg2
[11:22:32.245] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [INFO ] [o.g.t.c.a.deadletter.MQListener:13  ] - got message msg2
[11:22:32.246] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#0-1] [WARN ] [o.s.a.r.retry.RepublishMessageRecoverer:172 ] - Republishing failed message to exchange 'deadtest' with routing key deadtest
[11:22:32.250] [org.springframework.amqp.rabbit.RabbitListenerEndpointContainer#1-1] [ERROR] [o.g.t.c.a.deadletter.MQListener:20  ] - got dead message msg2

From the log, we can see:

  • The 4 retry intervals for msg1 are 1 second, 2 seconds, 4 seconds, and 8 seconds, plus the initial failure, so the maximum number of attempts is 5.

  • After the 4 retries, RepublishMessageRecoverer republishes the message to the dead letter exchange.

  • The dead letter handler outputs the “got dead message” log.

One thing to note is that although we sent two messages almost simultaneously, msg2 was processed only after all four retries of msg1 were completed. This is because by default, SimpleMessageListenerContainer has only one consumer thread. You can increase the number of consumer threads to avoid performance issues, for example, by directly configuring the concurrentConsumers parameter to 10:

@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setAdviceChain(interceptor());
    factory.setConcurrentConsumers(10);
    return factory;
}

Alternatively, you can set the maxConcurrentConsumers parameter to let SimpleMessageListenerContainer dynamically adjust the number of consumer threads. However, you need to pay special attention to its strategy for dynamically starting new threads. You can refer to the official documentation to learn more about this strategy.

Key Points Review #

When using the asynchronous processing architecture, we generally use MQ middleware to implement asynchronous workflows. There are four key considerations:

First, we need to consider the possibility of message loss or interruptions in the asynchronous process. The asynchronous process needs a backup line for compensation. For example, the full compensation method we introduced today allows the business to continue even if the asynchronous process fails completely.

Second, when dealing with asynchronous processing, we need to consider the possibility of message duplication. The processing logic should implement idempotence to prevent duplicate processing.

Third, in a microservices scenario where different services have multiple instances listening to messages, different services generally need to receive the same message at the same time, while multiple instances of the same service simply need to receive messages in a round-robin manner. We need to confirm whether the message routing configuration of the MQ meets the requirements to avoid issues such as message duplication or missing messages.

Fourth, we need to pay attention to dead letter messages that can never be processed, as they may cause MQ blocking issues. Generally, when a message processing failure occurs, we can set a certain retry strategy. If retries still don’t work, we can throw the message into a dedicated dead letter queue for special handling, so as not to let dead letters affect the processing of normal messages.

I have put the code used today on GitHub, you can click this link to view it.

Reflection and Discussion #

In the scenario where a message is sent to the MQ after user registration, and the member service listens to the message for asynchronous processing, sometimes we may find that even though the user service saves the data before sending the MQ, the member service queries the database after receiving the message and finds that there is no information about the new user yet. What do you think could be the problem, and how would you solve it?

Apart from implementing dead letter message redelivery with Spring AMQP, the dead letter exchange (DLX) supported by RabbitMQ 2.8.0 can also achieve similar functionality. Can you try to implement it using DLX and compare it with these two processing mechanisms?

Have you encountered any other issues regarding the use of MQ for asynchronous processing? I’m Zhu Ye. Feel free to leave a comment in the comment section to share your thoughts with me. You are also welcome to share today’s content with your friends or colleagues for further discussion.