27 Discussion on High Watermarks and Leader Epochs

27 Discussion on High Watermarks and Leader Epochs #

Hello, I’m Hu Xi. Today, I want to share with you the topic of High Watermark and Leader Epoch in Kafka.

You may have heard of High Watermark, but perhaps not Leader Epoch. The former is a very important concept in Kafka, while the latter was introduced by the community in version 0.11 to address some shortcomings of the High Watermark mechanism. Given the significance of the High Watermark mechanism in Kafka and its popularity among interviewers, today we will focus on discussing High Watermark. Of course, we will also spend some time discussing Leader Epoch and its role.

What is High Watermark? #

First of all, let’s clarify the basic definition: what is a water level? Or, what is a watermark? The term “watermark” is commonly used in the field of stream processing, such as in frameworks like Spark Streaming or Flink. The classic definition of a watermark in textbooks is often as follows:

At time T, if all events with a creation time (Event Time) before or at T’ have arrived or been observed, where T’≤T, then T is defined as the watermark.

The book “Streaming System” describes a watermark as follows:

A watermark is a monotonically increasing timestamp that represents the time of the oldest work not yet completed.

To help you better understand the watermark, let me use a diagram from this book to illustrate.

The blue section labeled “Completed” in the diagram represents the completed work, while the red section labeled “In-Flight” represents the work that is currently in progress. The boundary between the two is the watermark.

In the world of Kafka, the concept of a watermark is slightly different. The Kafka watermark is not a timestamp and is unrelated to time. It is associated with position information and specifically represented by message offsets. Additionally, in the Kafka source code, the term used is “high watermark”. Therefore, today I will also use the term “high watermark” or its abbreviation HW for discussion. It’s worth noting that Kafka also has a low watermark, which is a concept related to message deletion in Kafka. However, it is not directly related to the topic we are discussing today, so I won’t go into detail about it.

The Role of High Watermark #

In Kafka, the high watermark serves two main purposes.

  1. Defining message visibility, which identifies which messages in the partition can be consumed by consumers.
  2. Facilitating replica synchronization in Kafka.

The following image shows several Kafka terms related to the high watermark. I will explain the content of the image in detail and clarify some common misconceptions.

Let’s assume this is the high watermark graph of a leader replica of a partition. First, please pay attention to the “Committed Messages” and “Uncommitted Messages” in the image. We previously discussed the difference between the two when talking about Kafka’s durability guarantee in Lesson 11. Now, with the help of the high watermark, I will emphasize it again. Messages below the partition’s high watermark are considered committed messages, while those above it are uncommitted messages. Consumers can only consume committed messages, which are all messages with offsets smaller than 8 in the image. Please note that we are not discussing Kafka transactions here, as the transaction mechanism affects the range of messages that consumers can see, and it is not simply determined by the high watermark. It relies on a transactional consumer’s visibility defined by a Log Stable Offset (LSO).

Furthermore, it is worth noting that messages with offsets equal to the high watermark are also considered uncommitted messages. In other words, consumers cannot consume messages at the high watermark.

The image also introduces the concept of Log End Offset (LEO), which represents the offset value for the next message to be written to the replica. Please note that the box where the number 15 is located is dotted, indicating that this replica currently has only 15 messages with offsets ranging from 0 to 14, and the next message’s offset is 15. Obviously, messages between the high watermark and LEO are considered uncommitted messages. This also indirectly tells us an important fact: the high watermark of a replica object will never exceed its LEO value.

The high watermark and LEO are two important attributes of replica objects. Kafka replicas all have corresponding high watermark and LEO values, not just the leader replica. However, the leader replica is unique, and Kafka uses the high watermark of the leader replica to define the high watermark of the partition it belongs to. In other words, the high watermark of a partition is the same as the high watermark of its leader replica.

High Watermark Updating Mechanism #

Now, we know that each replica object stores a set of high watermark values and LEO values. However, in addition to the Follower replicas, the Broker on which the Leader replica resides also stores the LEO values of other Follower replicas. Let’s take a look at the following diagram together.

Diagram

In this diagram, we can see that Broker 0 stores the LEO values of the Leader replica and all Follower replicas of a certain partition, while Broker 1 only stores a certain Follower replica of that partition. Kafka refers to these Follower replicas stored on Broker 0 as remote replicas. During the operation of Kafka replication mechanism, the high watermark and LEO values of the Follower replica on Broker 1 are updated, as well as the high watermark and LEO values of the Leader replica on Broker 0 and the LEO values of all remote replicas. However, it does not update the high watermark value of the remote replicas, which I have marked as gray in the diagram.

Why are these remote replicas stored on Broker 0? In fact, their main purpose is to help the Leader replica determine its high watermark, which is the partition high watermark.

To help you remember when these values are updated better, I have made a table. Only by understanding the updating mechanism can we start discussing the principles of Kafka replication mechanism and how it uses the high watermark to perform replica message synchronization.

Table

Here, let me briefly explain what it means to keep in sync with the Leader replica. There are two conditions for judging.

  1. The remote Follower replica is in the ISR.
  2. The time that the LEO value of the remote Follower replica lags behind the LEO value of the Leader replica does not exceed the value of the broker-side parameter replica.lag.time.max.ms. If the default value is used, it is not more than 10 seconds.

At first glance, these two conditions seem to be the same thing, because currently whether a replica can enter the ISR is judged by the second condition. However, sometimes the following situation may occur: that is, the Follower replica has “caught up” with the progress of the Leader, but is not in the ISR, such as a replica that has just restarted. If Kafka only judges the first condition, it may occur that some replicas are qualified to “enter the ISR” but have not entered the ISR yet. At this time, the partition high watermark value may exceed the LEO of replicas in the ISR, and it is not allowed for the high watermark to be greater than LEO.

Next, let’s summarize the high watermark and LEO updating mechanisms from the perspectives of the Leader replica and the Follower replica.

Leader Replica

The logic for processing producer requests is as follows:

  1. Write the message to the local disk.
  2. Update the partition high watermark value. - i. Get the LEO values (LEO-1, LEO-2, …, LEO-n) of all remote replicas stored on the Broker where the Leader replica resides. - ii. Get the current high watermark value of the Leader replica: currentHW. - iii. Update currentHW = max{currentHW, min(LEO-1, LEO-2, …, LEO-n)}.

The logic for processing Follower replica pulling messages is as follows:

  1. Read the message data from disk (or page cache).
  2. Update the LEO value of the remote replica using the offset value in the Follower replica’s request.
  3. Update the partition high watermark value (the steps are the same as processing producer requests).

Follower Replica

The logic for pulling messages from the Leader is as follows:

  1. Write the message to the local disk.
  2. Update the LEO value.
  3. Update the high watermark value. - i. Get the high watermark value sent by the Leader: currentHW. - ii. Get the LEO value updated in step 2: currentLEO. - iii. Update the high watermark to min(currentHW, currentLEO).

Analysis of Replica Synchronization Mechanism #

After understanding the update mechanism of these values, let me give you an example to explain the full process of Kafka replica synchronization. This example uses a single partition with two replicas.

When a producer sends a message, how are the high-water marks for the Leader and Follower replicas updated? I have provided some images to illustrate each step.

First, let’s start with the initial state. The “remote LEO” in the image below represents the LEO (Log End Offset) value of the remote replica we just mentioned. In the initial state, all values are 0.

Initial State

After the producer sends a message to the topic partition, the state changes to:

State after Producer sends a message

At this point, the Leader replica successfully writes the message to local disk, so the LEO value is updated to 1.

The Follower then tries to fetch messages from the Leader. This time, there is a message available for fetching, so the state further changes to:

State after Follower fetches the message

Now, the Follower replica also successfully updates its LEO to 1. At this point, the LEO for both the Leader and Follower replicas is 1, but their respective high-water marks are still 0 and have not been updated. They need to be updated in the next round of fetch, as shown in the following image:

State after next round of fetch

In the new round of fetch requests, since the message with offset 0 has already been successfully fetched, the Follower replica requests to fetch the message with offset=1. Upon receiving this request, the Leader replica updates the remote replica’s LEO to 1 and updates its own high-water mark to 1. After completing these steps, it sends the updated high-water mark of 1 to the Follower replica. When the Follower replica receives this, it also updates its own high-water mark to 1. At this point, a complete message synchronization cycle is finished. In fact, Kafka uses this mechanism to achieve synchronization between Leader and Follower replicas.

Introduction of Leader Epoch #

The story so far seems perfect, with Kafka defining the visibility of messages to the outside world based on the high watermark and implementing an asynchronous replica synchronization mechanism. However, let’s consider the problems that exist here.

From the analysis we just did, we know that the update of the high watermark in a follower replica requires an additional round of fetch requests to be completed. If we extend the example above to multiple follower replicas, the situation may be even worse, requiring multiple rounds of fetch requests. In other words, the update of the high watermark in the leader replica and the follower replicas is mismatched in time. This mismatch is the root cause of many “data loss” or “data inconsistency” problems. Based on this, the community officially introduced the concept of Leader Epoch in version 0.11 to avoid various inconsistencies caused by the mismatch of high watermark updates.

The so-called Leader Epoch can be roughly understood as the version of the leader. It consists of two pieces of data.

  1. Epoch: A monotonically increasing version number. Whenever there is a change in leadership of a replica, this version number is increased. A leader with a smaller version number is considered an outdated leader and cannot exercise leader powers.
  2. Start Offset: The offset of the first message written by the leader replica in this epoch.

Let me give you an example to explain Leader Epoch. Suppose there are two Leader Epochs, <0, 0> and <1, 120>. The first Leader Epoch represents version 0, and the leader for this version starts saving messages from offset 0 and has saved a total of 120 messages. Then, there is a change in leadership and the version number increases to 1, and the new version starts from offset 120.

Kafka brokers cache the Leader Epoch data for each partition in memory, and they also periodically persist this information to a checkpoint file. When a leader replica writes messages to disk, the broker attempts to update this cache. If the leader is writing messages for the first time, the broker adds a Leader Epoch entry to the cache; otherwise, it doesn’t update the cache. This way, whenever there is a leadership change, the new leader replica queries this cache, retrieves the corresponding start offset for the Leader Epoch, and avoids data loss and inconsistencies.

Next, let’s look at an actual example that demonstrates how Leader Epoch prevents data loss. Please take a look at the following figure.

Let me explain how data loss can occur relying solely on the high watermark. At the beginning, both replica A and replica B are in a normal state, with A being the leader. A producer program with the default acks setting sends two messages to A, and A successfully writes them all. At this point, Kafka informs the producer that both messages have been successfully sent.

Now, let’s assume that both the leader and the follower have written these two messages and the high watermark of the leader replica has been updated, but the high watermark of the follower replica has not been updated yet—this is possible due to the time mismatch we mentioned before. Remember, there is a time mismatch between the update of the follower’s high watermark and the leader’s high watermark. If at this point the broker where replica B is located crashes, when it restarts, replica B will perform a log truncation operation and adjust its LEO value to the previous high watermark value, which is 1. This means that the message with an offset of 1 is deleted from replica B’s disk, and only the message with an offset of 0 is saved in the underlying disk file of replica B.

After completing the truncation operation, replica B starts fetching messages from replica A and performs normal message synchronization. If at this critical moment, the broker where replica A is located crashes, Kafka has no choice but to make replica B the new leader. At this point, when replica A comes back, it needs to perform the same log truncation operation, adjusting its high watermark to the same value as replica B, which is 1. After this operation, the message with an offset of 1 is permanently erased from these two replicas. This is the data loss scenario that this figure demonstrates.

Strictly speaking, this scenario occurs when the broker-side parameter min.insync.replicas is set to 1. In this case, once a message is written to the leader replica’s disk, it is considered “committed”, but the existing time mismatch problem causes the follower’s high watermark update to lag behind. If brokers continue to crash in this short time window, this kind of data loss is inevitable.

Now, let’s see how the Leader Epoch mechanism can be used to avoid this kind of data loss. I will explain with another figure.

The scenario is similar to the previous one, but after applying the Leader Epoch mechanism, when replica B restarts, it needs to send a special request to replica A to get the leader’s LEO value. In this example, the value is 2. When replica B learns that the leader’s LEO is 2, it finds that this LEO value is not smaller than its own LEO value, and there are also no Epoch entries in the cache that have start offsets larger than 2. Therefore, replica B does not need to perform any log truncation operation. This is a significant improvement over the high watermark mechanism, as the decision of whether a replica should perform log truncation no longer depends on the high watermark.

Now, replica A crashes and B becomes the leader. Similarly, when replica A comes back, it performs the same logic check as B and finds that log truncation is not necessary. Therefore, the message with an offset of 1 is preserved in both replicas. When the producer program writes new messages to B, the Kafka broker in which replica B is located generates a new Leader Epoch entry in its cache: [Epoch=1, Offset=2]. Later, replica B will use this entry to determine whether log truncation should be performed. In this way, by using the Leader Epoch mechanism, Kafka is able to perfectly avoid this data loss scenario.

Summary #

Today, I’ve provided you with a detailed introduction to Kafka’s high watermark mechanism and Leader Epoch mechanism. The high watermark plays a crucial role in defining the visibility of Kafka messages to the outside world and implementing the replication mechanism. However, its design flaws leave Kafka vulnerable to potential risks of data loss or inconsistency. To address this, the community introduced the Leader Epoch mechanism, attempting to mitigate such risks. It has proven to be effective, as the number of bugs related to replica data inconsistency has significantly decreased after version 0.11. If you are interested in delving into the internal workings of Kafka, today’s content is worth pondering over and mastering.

Open Discussion #

In discussing high watermarks, I used the example of 2 replicas. However, you should easily be able to extend it to multiple replicas. Now, please try to explain the full process of replica synchronization and the updating of partition high watermarks using 3 replicas.

Feel free to share your thoughts and answers, and let’s discuss together. If you find it helpful, you can also share this article with your friends.