23 Kafka Replication Mechanism Explained

23 Kafka Replication Mechanism Explained #

Hello, I’m Hu Xi. Today I will share with you the topic of Apache Kafka’s replication mechanism.

The so-called replication mechanism, also known as backup mechanism, usually refers to the distributed system storing identical copies of data on multiple interconnected machines. What are the benefits of replication mechanism?

  1. Provides data redundancy. Even if some components of the system fail, the system can still continue to operate, thereby increasing overall availability and data durability.
  2. Provides high scalability. Supports horizontal scaling, allowing for increased reading performance by adding machines, thereby increasing overall throughput of read operations.
  3. Improves data locality. Allows data to be placed in locations close to the user’s geographic location, thereby reducing system latency.

These advantages are often mentioned in distributed system textbooks, but unfortunately, for Apache Kafka, currently only the first benefit brought by the replication mechanism can be enjoyed, which is providing data redundancy to achieve high availability and durability. In the upcoming content, I will explain in detail the reasons why Kafka cannot provide the second and third benefits.

However, even so, the replication mechanism is still at the core of Kafka’s design architecture, and it is an important cornerstone for Kafka to ensure system high availability and message durability.

Definition of Replicas #

Before discussing specific replica mechanisms, let’s take a moment to clarify the meaning of a replica.

As we mentioned before, Kafka has the concept of topics, and each topic is further divided into several partitions. The concept of replicas is actually defined at the partition level, with each partition being configured with several replicas.

A replica is essentially a commit log that can only append messages. According to the definition of Kafka’s replica mechanism, all replicas within the same partition have the same sequence of messages. These replicas are distributed across different brokers, which allows them to withstand the unavailability of data caused by the failure of some brokers.

In a real production environment, each broker may store replicas of different partitions in different topics. Therefore, it is normal for a single broker to have hundreds or even thousands of replicas.

Next, let’s take a look at a diagram showing the replica distribution on a Kafka cluster with 3 brokers. From this diagram, we can see that the 3 replicas of partition 0 in topic 1 are distributed across the 3 brokers, and the replicas of other partitions in different topics are also scattered across different brokers, thereby achieving data redundancy.

Replica Distribution in a 3-Broker Kafka Cluster

Replica Roles #

Since multiple replicas can be configured within a partition, and these replicas need to have consistent data, a natural question arises: how do we ensure that all data in the replicas is consistent? Specifically for Kafka, how are messages synchronized to all the corresponding replicas after the producer sends a message to a topic? The most common solution to this problem is to use a leader-based replica mechanism. Apache Kafka is designed this way.

The working principle of the leader-based replica mechanism is illustrated in the following diagram. Let me explain the contents of the diagram.

  1. In Kafka, replicas are divided into two types: leader replicas and follower replicas. When a partition is created, one replica is elected as the leader replica, and the remaining replicas become follower replicas automatically.

  2. Kafka’s replica mechanism is stricter than other distributed systems. In Kafka, follower replicas do not provide services externally. This means that no follower replica can respond to read and write requests from consumers and producers. All requests must be handled by the leader replica, or in other words, all read and write requests must be sent to the broker where the leader replica is located for processing. Follower replicas do not handle client requests. Their only task is to asynchronously pull messages from the leader replica and write them into their own commit log, thus achieving synchronization with the leader replica.

  3. When the leader replica fails or the broker where the leader replica is located crashes, Kafka, relying on the monitoring capabilities provided by ZooKeeper, can realize this in real-time and immediately start a new round of leader election to select a new leader from the follower replicas. When the old leader replica restarts, it can only join the cluster as a follower replica.

Pay special attention to the second point mentioned above, which states that follower replicas do not provide services externally. Remember when we talked about the benefits of the replica mechanism earlier, we mentioned that Kafka cannot provide read operation scaling or improve data locality. The specific reason lies in this point.

For client users, Kafka’s follower replicas have no effect. They cannot help the leader replica handle reads like MySQL does, nor can they place replicas closer to the client to improve data locality.

Why did Kafka design it this way? In fact, this replica mechanism has two advantages.

  1. Convenient implementation of “Read-your-writes”.

Read-your-writes means that when you use the producer API to successfully write a message to Kafka, you immediately use the consumer API to read the message you just produced.

For example, when you post a Weibo, you surely want to see it immediately. This is a typical Read-your-writes scenario. If follower replicas are allowed to provide services externally, since replica synchronization is asynchronous, it is possible that a follower replica has not yet pulled the latest message from the leader replica, causing the client to not see the latest written message.

  1. Convenient implementation of monotonic reads.

What is monotonic reads? It means that for a consumer user, when consuming messages multiple times, they will not see a message appearing and disappearing.

If follower replicas are allowed to provide read services, consider a situation where there are two follower replicas, F1 and F2, which asynchronously pull data from the leader replica. If F1 has pulled the latest message from the leader, but F2 has not pulled it in time, then if a consumer reads messages from F1 first and then reads messages from F2, it may observe this phenomenon: the latest message seen during the first consumption is no longer there during the second consumption. This is not monotonic read consistency. However, if all read requests are processed by the leader, Kafka can easily achieve monotonic read consistency.

In-sync Replicas (ISR) #

We have just mentioned that follower replicas do not provide service, they only periodically pull data asynchronously from leader replicas. Asynchronous replication introduces the risk of not being in sync with the leader in real time. Before discussing how to handle this risk correctly, we need to understand the meaning of synchronization. In other words, Kafka needs to explicitly tell us under what conditions a follower replica can be considered in sync with the leader.

Based on this idea, Kafka introduces the concept of In-sync Replicas, also known as ISR replicas. The replicas in the ISR are replicas that are in sync with the leader. On the other hand, follower replicas that are not in the ISR are considered out of sync. So, what replicas can enter the ISR?

First and foremost, the leader replica naturally belongs to the ISR. In other words, the ISR is not only a collection of follower replicas, it always includes the leader replica. In some cases, the ISR may only have the leader replica.

In addition, follower replicas that can enter the ISR must meet certain requirements. As for what these requirements are, let’s take a look at the following figure first.

The figure shows 3 replicas: 1 leader replica and 2 follower replicas. The leader replica has written 10 messages, follower replica 1 has replicated 6 messages, and follower replica 2 has only replicated 3 messages. Now, think about it, which follower replica do you think is out of sync with the leader?

The answer is, it depends on the specific situation. Follower replica 2 may appear to be more out of sync with the leader because it has a significantly lower message count. Indeed, that could be the case, but it’s only a possibility.

In fact, both follower replicas in this figure could be out of sync with the leader, but they could also be in sync. In other words, Kafka’s criterion for determining whether a follower is in sync with the leader is not based on the difference in message count, but rather there is another “secret”.

This criterion is the broker-side parameter replica.lag.time.max.ms. This parameter represents the maximum time interval that a follower replica can lag behind the leader replica. The default value is 10 seconds. This means that as long as a follower replica is not consecutively behind the leader replica for more than the replica.lag.time.max.ms time, Kafka considers that the follower replica is in sync with the leader, even if the message count in the follower replica is noticeably less than that in the leader replica.

As mentioned earlier, the sole purpose of a follower replica is to continuously pull messages from the leader replica and write them into its own log. If the rate of this replication process is consistently slower than the rate at which the leader replica writes messages, the follower replica will be considered out of sync with the leader after replica.lag.time.max.ms. At this point, Kafka automatically shrinks the ISR, removing the replica from it.

It should be noted that if the follower replica catches up with the leader’s progress slowly later on, it can be added back to the ISR. This also means that the ISR is a dynamically adjusted collection, rather than a static one. Since ISR can be dynamically adjusted, it is possible for the case where ISR is empty. Because the leader replica naturally belongs to ISR, if ISR is empty, it means the leader replica has “crashed”, and Kafka needs to elect a new leader. However, since ISR is empty, how should a new leader be elected in this case?

Kafka refers to all alive replicas that are not in ISR as unclean replicas. Typically, unclean replicas lag behind the leader replica, so if these replicas are chosen as the new leader, data loss may occur. After all, the messages stored in these replicas are far behind the messages in the old leader. In Kafka, the process of electing such replicas is called unclean leader election. The broker-side parameter unclean.leader.election.enable controls whether unclean leader election is allowed.

Enabling unclean leader election can result in data loss, but the benefit is that it keeps the partition leader replica always available, so as not to stop providing services externally, thereby improving high availability. On the contrary, the benefit of disabling unclean leader election is that it maintains data consistency and avoids message loss, but at the cost of sacrificing high availability.

If you have heard of the CAP theorem, you must know that a distributed system can usually only satisfy two out of consistency, availability, and partition tolerance at the same time. Obviously, in this issue, Kafka gives you the right to choose between C and A.

You can decide whether to enable unclean leader election based on your actual business scenario. However, I strongly advise you not to enable it, considering that we can still improve high availability through other means. Sacrificing data consistency for this slight improvement in high availability is not worth it.

Summary #

Today, I mainly shared with you the replica mechanism of Apache Kafka and the principles behind its implementation. To be honest, I think some parts may have been simplified, and if you want to fully understand replication, you should study the corresponding source code of Kafka. But don’t worry, in the later sections of this column, I will specifically analyze the replica mechanism from the perspective of source code, especially the entire process of how Follower replicas pull messages from Leader replicas. In terms of technical depth, that section should be the most insightful analysis in this column, so be sure not to miss it.

Open Discussion #

So far, I have repeatedly emphasized that Follower replicas do not provide services externally. Interestingly, the community is currently considering whether to break this restriction and allow Follower replicas to handle client requests. The main consideration of the community is that this can be used to improve the locality of cloud data and better serve customers in geographically close locations. If Follower replicas are allowed to provide external read services, how do you think we should avoid or mitigate the data inconsistency caused by the differences between Follower and Leader replicas?

Feel free to share your thoughts and answers for discussion. If you find it helpful, please feel free to share this article with your friends.