21 Abstract Fetcher Thread How Message Fetching Is Divided Into Several Steps

21 AbstractFetcherThread How Message Fetching is Divided into Several Steps #

Hello, I am Hu Xi. Starting today, we officially enter the study of the source code for the 5th major module, “Replica Management Module”.

In Kafka, replica is one of the most important concepts. Why is that? In previous lessons, I have mentioned repeatedly that the replica mechanism is the foundation for Kafka to achieve high data reliability. Specifically, multiple replicas of the same partition are distributed across different broker machines, and they store the same message data to achieve high reliability. For distributed systems, a key problem to solve is how to ensure data consistency across all replicas.

The most common solution to this problem is the Leader/Follower replication mechanism. In Kafka, one of the replicas of a partition is designated as the Leader, responsible for responding to client read and write requests. The other replicas automatically become Followers and passively synchronize the data from the Leader replica.

The term “passive synchronization” means that the Follower replica continuously sends read requests to the Leader replica to obtain the latest message data written by the Leader.

In the next two lessons, we will learn together how Follower replicas achieve this through fetcher threads. Additionally, during the replica synchronization process, Follower replicas may also perform an operation called truncation. We will also take a look at its implementation principle.

Pre-class Case Study #

To be frank, this part of the source code is very close to the underlying design architecture principles. You may be wondering: how is reading it practically helpful to me? Let me give you a practical example.

We once discovered in a production environment that when the number of replicas on the Broker was too high, the memory usage of the Broker nodes would be very high. After inspecting the HeapDump, we found that the root cause was in the buildFetch method in the ReplicaFetcherThread file. In this method, there is the following line of code:

val builder = fetchSessionHandler.newBuilder()

This line of code will instantiate a LinkedHashMap at the underlying level. If there are many partitions, this map will be resized multiple times, resulting in unnecessary data copying. This not only increases the memory footprint but also wastes CPU resources.

You see, by querying the source code, we located the root cause of this issue. Later on, we resolved this issue by transferring the load to other Brokers.

In fact, the Kafka community also discovered this bug. So when you look at this part of the source code now, you will find that this line of code has been fixed. It now looks like this, and you can perceive the difference compared to before:

val builder = fetchSessionHandler.newBuilder(partitionMap.size, false)

You may also notice that the most significant difference between before and after the modification is that the modified line of code directly passes the total number of partitions in the FETCH request and directly assigns it to the LinkedHashMap, avoiding the need for resizing.

You see, sometimes improving a single line of source code can solve practical problems. Moreover, don’t think that modifying source code is some mysterious task. Once you understand the principles, you can adjust the code specifically, which is actually a very enjoyable thing.

Alright, let’s get back to the topic of Follower replicas fetching data from Leader replicas. I don’t know if you noticed, but in the previous example, I mentioned a name: ReplicaFetcherThread, which refers to the replica fetching thread. Yes, Kafka source code achieves message fetching and processing through this thread.

In today’s class, let’s start with the abstract base class AbstractFetcherThread and take a look at its class definition and three important methods. In the next class, we will continue learning about an important method of the AbstractFetcherThread class and the source code of the subclass ReplicaFetcherThread. This way, we will thoroughly understand the principles of synchronizing messages from the Leader side to the Follower side.

Abstract Base Class: AbstractFetcherThread #

Wait a minute, weren’t we supposed to learn ReplicaFetcherThread? Why are we starting with its parent class AbstractFetcherThread?

Well, the reason is quite simple. AbstractFetcherThread is the abstract base class of ReplicaFetcherThread. It defines and implements many important fields and methods, which serve as the foundation for understanding the source code of ReplicaFetcherThread. The source code of AbstractFetcherThread also provides guidance on the methods that need to be implemented by its subclasses.

Therefore, it is necessary to understand this abstract base class beforehand, in order to smoothly transition to the study of its subclass’s source code.

Now, let’s get to know AbstractFetcherThread. Its source code is located in the file AbstractFetcherThread.scala under the server package. As the name suggests, it is an abstract class that retrieves message data from multiple partitions of a broker. The processing of the retrieved data is delegated to its subclasses.

Class Definition and Fields #

Let’s take a look at the definition of the AbstractFetcherThread class and some important fields:

abstract class AbstractFetcherThread(
  name: String,
  clientId: String,
  val sourceBroker: BrokerEndPoint,
  failedPartitions: FailedPartitions,
  fetchBackOffMs: Int = 0,
  isInterruptible: Boolean = true,
  val brokerTopicStats: BrokerTopicStats)
  extends ShutdownableThread(name, isInterruptible) {
  
  // Definition of FetchData type representing the retrieved message data
  type FetchData = FetchResponse.PartitionData[Records]
  
  // Definition of EpochData type representing Leader Epoch data
  type EpochData = OffsetsForLeaderEpochRequest.PartitionData

  private val partitionStates = new PartitionStates[PartitionFetchState]
  //...
}

Let’s explore the meanings of several important parameters received by the constructor of AbstractFetcherThread.

  • name: The name of the thread.
  • sourceBroker: The information of the source broker. Source broker refers to the broker from which this thread reads data.
  • failedPartitions: The collection of partitions that encountered errors during processing by the thread.
  • fetchBackOffMs: The retry interval when fetching partition data fails. The default value is the broker-side parameter replica.fetch.backoff.ms.
  • brokerTopicStats: The monitoring metrics for topics on the broker, such as MessagesInPerSec and BytesInPerSec.

Among these parameters, sourceBroker is particularly important because it determines from which broker the follower replica fetches data, i.e., the broker where the leader replica resides.

In addition to these constructor parameters, the AbstractFetcherThread class also defines two types using the type keyword. This is a high-level syntax feature in Scala. To some extent, you can think of it as a shortcut. For example, take this line of code:

type FetchData = FetchResponse.PartitionData[Records]

This line of code is similar to a shortcut: instead of using FetchResponse.PartitionData[Records] everywhere in the code, you can simply use FetchData, which is concise and convenient. The same applies to the self-defined type EpochData.

In the definition of FetchData, PartitionData is a nested class defined in the clients project of the client module. The FetchResponse class encapsulates the response object of the FETCH request, and the PartitionData class inside it is a POJO class that stores various data of a single partition fetched, including the messages fetched from the leader replica of the partition, the high watermark value of the partition, and the log start offset value.

Let’s take a look at its code:

public static final class PartitionData<T extends BaseRecords> {
    public final Errors error;
    public final long highWatermark;
    public final long lastStableOffset;
    public final long logStartOffset;
    public final Optional<Integer> preferredReadReplica;
    public final List<AbortedTransaction> abortedTransactions;
    public final T records;
    // constructor...
}

In the fields defined in the PartitionData class, in addition to the familiar highWatermark and logStartOffset fields, there are several other advanced usage:

  • preferredReadReplica: Used to specify a follower replica that can provide read service externally. This feature is supported since Kafka 2.4.
  • abortedTransactions: Used to store the list of currently aborted transactions for the partition.
  • lastStableOffset: The latest LSO (Last Stable Offset), which is a concept related to Kafka transactions.

You only need to understand the basic purpose of these fields. In fact, in the PartitionData class, the field we should focus on most is the records field, because it contains the actual message collection, which is the data we are most concerned about.

Speaking of this, if you check the definition of EpochData, you will find that it is also of type PartitionData. However, you should be aware that PartitionData in EpochData is of type OffsetsForLeaderEpochRequest.PartitionData.

In fact, in the Kafka source code, there are many nested classes named PartitionData. Many request types group their data at the partition level, so it is natural to create similarly named nested classes in these request classes. When examining the source code, it is important to distinguish which PartitionData nested class belongs to which type of request. The PartitionData class fields in different types of requests are completely different.

Partition Fetch State Class #

Alright, let’s turn our attention back to the AbstractFetcherThread class. In the constructor of this class, we see that it also encapsulates a field named PartitionStates[PartitionFetchState].

Does it look a bit complicated? But don’t worry, let’s break it down. First, let’s look at the generic parameter type PartitionFetchState. Intuitively, it represents the state of partition fetches and stores the offset values that have been read and the corresponding replica state of each partition.

Note that there are two states here: the partition fetch state and the replica fetch state. The replica fetch state is represented by the ReplicaState trait, as shown below:

sealed trait ReplicaState
case object Truncating extends ReplicaState
case object Fetching extends ReplicaState

As you can see, the replica fetch state has two states: Truncating when the replica is truncating, and Fetching when the replica is being fetched.

On the other hand, the partition fetch state has three states:

  • Available: Indicates that the replica fetch thread is currently able to read data.
  • Truncating: Indicates that the partition replica is performing a truncation operation (e.g., the replica has just become a follower).
  • Delayed: Indicates that an error occurred while the replica fetch thread was fetching data, and it needs to wait for a certain amount of time before retrying.

It is worth noting that the states “Available” and “Truncating” in the partition read state do not strictly correspond to the states “Fetching” and “Truncating” in the replica read state. In other words, if the replica read state is “Fetching,” it does not necessarily mean that the partition read state is “Available.” For a partition, the conditions for being able to be fetched are stricter than for a replica.

Next, let’s take a look at the source code definition of these 3 types of partition fetch states:

case class PartitionFetchState(fetchOffset: Long,
  lag: Option[Long],
  currentLeaderEpoch: Int,
  delay: Option[DelayedItem],
  state: ReplicaState) {
  // The condition for a partition to be ready for fetch is that the replica state is Fetching and not delayed execution
  def isReadyForFetch: Boolean = state == Fetching && !isDelayed
  // The condition for a replica to be in sync with the ISR is that there is no lag
  def isReplicaInSync: Boolean = lag.isDefined && lag.get <= 0
  // The condition for a partition to be in truncation state is that the replica state is Truncating and not delayed execution
  def isTruncating: Boolean = state == Truncating && !isDelayed
  // The condition for a partition to be delayed in fetching data is that there is an unexpired delay task
  def isDelayed: Boolean =
    delay.exists(_.getDelay(TimeUnit.MILLISECONDS) > 0)
  ......
}

In this source code, there are 4 methods, but you only need to focus on the isReadyForFetch and isTruncating methods. This is because the replica fetch thread does two things: log truncation and message fetching.

As for isReplicaInSync, it is used for replica throttling, but it is not frequently used. isDelayed is used to determine whether fetching messages for the corresponding partition needs to be postponed. The source code continuously adjusts the reading order of partitions that do not need to be delayed to ensure fairness in reading.

This fairness is actually implemented in the PartitionStates class, which is defined in the clients module. This class essentially receives a set of topic partitions to be read and reads them in a round-robin manner to ensure fairness.

Given that this course focuses on broker-side source code, I will briefly explain the implementation principle of this class here. If you want to deeply understand this part, you can explore the source code in the clients module on your own.

public class PartitionStates<S> {
    private final LinkedHashMap<TopicPartition, S> map = new LinkedHashMap<>();
    ......
    public void updateAndMoveToEnd(TopicPartition topicPartition, S state) {
      map.remove(topicPartition);
      map.put(topicPartition, state);
      updateSize();
    }
    ......
}

As mentioned earlier, the PartitionStates class handles multiple partitions to be read in a round-robin manner. How is it specifically implemented? It relies on the LinkedHashMap data structure to store all the topic partitions. The elements in LinkedHashMap have a clear iteration order, typically the order in which the elements were inserted.

Suppose Kafka needs to read messages from 5 partitions: A, B, C, D, and E. If the insertion order is ABCDE, then naturally partition A will be read first. Once partition A is read, in order to ensure that each partition has an equal chance of being read, the code needs to move A to the last position in the partition list, and that’s what the updateAndMoveToEnd method does. Specifically, it is to remove A from the map and then insert it back, so that A naturally becomes the last item in the list. In general, the PartitionStates class is used to do this.

Important Methods #

After discussing the definition of the AbstractFetcherThread class, let’s take a look at some of the important methods it provides.

This class encapsulates a total of nearly 40 methods. Next, I will select four methods based on the importance of these methods for your use of Kafka and solving Kafka problems. These methods are processPartitionData, truncate, buildFetch, and doWork. These four methods cover the three most important tasks of the fetch thread: constructing FETCH requests, performing truncation operations, and processing the results of the fetch. The doWork method actually connects the previous three methods together.

Well, let’s take a look at each of them.

First is its most important method, processPartitionData, used to process the collection of messages that were read. It is an abstract method, so it needs to be implemented by subclasses. Specifically for follower replicas, it is implemented by the ReplicaFetcherThread class. Here is its method signature:

protected def processPartitionData(
  topicPartition: TopicPartition,  // The partition from which to read the data
  fetchOffset: Long,               // The latest offset read
  partitionData: FetchData         // The fetched partition data
): Option[LogAppendInfo]           // Metadata before writing the read messages

The field we need to pay attention to is the return value of this method, Option[LogAppendInfo]:

  • For follower replicas reading messages and writing them to the log, you can ignore the Option here because it will definitely return a specific LogAppendInfo instance, not None.
  • As for the LogAppendInfo class, we have already introduced it in the “Log Module”. It encapsulates important metadata information before the message data is written to the log, such as the offset of the first message, the offset of the last message, and the maximum timestamp.

In addition to the processPartitionData method, another important method is truncate, with the following method signature:

protected def truncate(
  topicPartition: TopicPartition, // The partition on which to perform truncation for replicas
  truncationState: OffsetTruncationState  // Offset + truncation state
): Unit

The OffsetTruncationState class here encapsulates an offset value and a boolean state indicating whether the truncation is completed. Its main purpose is to tell Kafka to truncate the replicas of the specified partition to a certain offset value.

The third important method is buildFetch. The code is as follows:

protected def buildFetch(
  // A set of partitions to read
  // The readability of each partition depends on the state in PartitionFetchState
  partitionMap: Map[TopicPartition, PartitionFetchState]
): ResultWithPartitions[Option[ReplicaFetch]]

Although the return value of the buildFetch method seems complicated, if you read the source code, you will find that the essence of buildFetch is to construct the corresponding FetchRequest.Builder object for the specified partition, and this object is the core component for constructing a FetchRequest. Any type of message reading in Kafka is completed by sending a FetchRequest request to the specified broker.

The fourth important method is doWork. Although it has a small number of lines of code, it is the main entry method that connects the previous three methods together and is the core method of the AbstractFetcherThread class. Therefore, we need to spend some more time to understand how these methods are combined to work together. I will explain the code principles in detail in the next lesson.

Summary #

Today, we mainly studied the replica synchronization mechanism and replica manager component of Kafka. Currently, the message synchronization between Kafka replicas is done by the ReplicaFetcherThread thread. We focused on reading the code of its abstract base class, AbstractFetcherThread. As the common base class for fetching threads, AbstractFetcherThread defines many important methods.

Let’s review the key points of this lesson.

  • AbstractFetcherThread class: The abstract base class for fetching threads. It defines common methods to handle the logic that all fetching threads need to implement, such as performing truncation operations and fetching messages.
  • Fetching thread logic: The logic of performing truncation operations and fetching data in a loop.
  • Partition fetch state: Currently, the source code defines three types of partition fetch states. The fetching threads can only fetch data from partitions that are in a readable state.

In the next lesson, I will guide you to go through the code of the doWork method, so that we can fully understand the execution logic of replica fetching threads. In this process, we will also gradually encounter the code of three important methods in the ReplicaFetcherThread class. You need to understand their implementation mechanism and how doWork organizes them together.

课后讨论 #

handlePartitionsWithErrors方法的实现原理如下:

  1. 首先,该方法接收一个参数,即包含错误分区信息的列表。
  2. 然后,方法会检查错误分区列表中的每个分区,并根据不同的错误类型执行相应的处理逻辑。
  3. 如果分区的错误类型是可恢复的,那么该分区会被尝试重试。方法会根据设置的重试策略进行相应的重试操作,直到分区成功处理为止。
  4. 如果分区的错误类型是不可恢复的,那么该分区会被记录下来,并在日志中生成相应的错误信息。
  5. 在处理完所有的错误分区后,方法会返回一个包含成功处理分区的列表。

以上就是handlePartitionsWithErrors方法的简要实现原理。如果你有任何其他的想法或问题,请在留言区继续讨论。如果你觉得这个内容有价值,也欢迎你分享给你的朋友。谢谢!