22 Replica Fetcher Thread How the Follower Pulls Messages From Leader

22 ReplicaFetcherThread How the Follower Pulls Messages from Leader #

Hello, I am Hu Xi. Today, we will continue learning how the Follower fetches messages from the Leader.

To understand this question, when studying the source code, we need to start from the parent class AbstractFetcherThread, as this is the basis for understanding the subclass ReplicaFetcherThread. In the previous lesson, we have already learned the definition of AbstractFetcherThread, as well as the functions of the three methods: processPartitionData, truncate, and buildFetch. By now, you should have mastered the handling logic of the fetch thread source code and the code structure that supports these logic implementations.

However, at the end of the previous lesson, I left a suspense - I saved the doWork method, which connects these three methods, for today’s lesson. Once you have learned the doWork method, as well as the implementation code of these three methods in the subclass ReplicaFetcherThread, you will have a complete understanding of the workflow of the Follower replica application fetch thread (i.e., the ReplicaFetcherThread thread) in fetching messages from the Leader replica and processing them.

So, now let’s begin the reading of the doWork method and the code of the subclass ReplicaFetcherThread.

AbstractFetcherThread class: doWork() method #

The doWork() method is the core method of the AbstractFetcherThread class, which is the main logical running method of the thread. The code is as follows:

override def doWork(): Unit = {
  maybeTruncate()   // Perform truncate operation
  maybeFetch()      // Perform fetch operation
}

Looks simple, right? The AbstractFetcherThread thread will continuously repeat these two operations as long as it remains running. The logic of fetching messages is easy to understand, but why does the AbstractFetcherThread thread always need to attempt truncation?

This is because the leader of the partition may change at any time. Whenever a new leader is elected, the follower replica must actively truncate its local log to have the same message sequence as the leader replica. In fact, the leader replica itself also needs to truncate its log and adjust the LEO to the high watermark of the partition.

Now, let’s take a look at the code for these two operations:

First, let’s take a look at the maybeTruncate() method. The code is not long, less than 10 lines:

private def maybeTruncate(): Unit = {
  // Group the partitions in truncation state based on whether they have Leader Epoch or not
  val (partitionsWithEpochs, partitionsWithoutEpochs) = fetchTruncatingPartitions()
  // For partitions with Leader Epoch, truncate the log to the offset corresponding to Leader Epoch
  if (partitionsWithEpochs.nonEmpty) {
    truncateToEpochEndOffsets(partitionsWithEpochs)
  }
  // For partitions without Leader Epoch, truncate the log to the high watermark
  if (partitionsWithoutEpochs.nonEmpty) {
    truncateToHighWatermark(partitionsWithoutEpochs)
  }
}

The logic of the maybeTruncate() method is very simple.

First, it groups the partitions in truncation state. Since truncation is required, this method only operates on partitions that are in the truncation state. The code checks whether these partitions have a corresponding Leader Epoch, and groups them based on the presence or absence of Epoch. This is what the fetchTruncatingPartitions() method does.

I mentioned the Leader Epoch mechanism in Lecture 3. It is used to replace the role of the high watermark in log truncation. Here is a typical application scenario of the Leader Epoch mechanism:

  • When a partition has a Leader Epoch, the code truncates the replica’s local log to the latest offset corresponding to the Leader Epoch, which is the logic implemented in the truncateToEpochEndOffsets() method.
  • On the contrary, if a partition does not have a corresponding Leader Epoch record, it still uses the original high watermark mechanism and calls the truncateToHighWatermark() method to adjust the log to the high watermark.

Since the Leader Epoch mechanism is a more advanced knowledge, our focus here is on understanding the application of the high watermark in truncation operations, so I won’t explain the Leader Epoch mechanism in detail. If you want to have a deeper understanding of this mechanism, you can study the source code of the LeaderEpochFileCache class.

Therefore, let’s focus on the implementation code of the truncateToHighWatermark() method.

private[server] def truncateToHighWatermark(
  partitions: Set[TopicPartition]): Unit = inLock(partitionMapLock) {
  val fetchOffsets = mutable.HashMap.empty[TopicPartition, OffsetTruncationState]
  // Iterate over each partition to truncate
  for (tp <- partitions) {
    // Get the partition's fetch state
    val partitionState = partitionStates.stateValue(tp)
    if (partitionState != null) {
      // Get the high watermark, which is the maximum readable offset for the partition
      val highWatermark = partitionState.fetchOffset
      val truncationState = OffsetTruncationState(highWatermark, truncationCompleted = true)

      info(s"Truncating partition $tp to local high watermark $highWatermark")
      // Perform truncation to the high watermark
      if (doTruncate(tp, truncationState))
        fetchOffsets.put(tp, truncationState)
    }
  }
  // Update the fetch offset of these partitions and maybe mark truncation as complete
  updateFetchOffsetAndMaybeMarkTruncationComplete(fetchOffsets)
}

Let me explain the logic of the truncateToHighWatermark() method: First, iterate over the given set of partitions; then, for each partition, get the current high watermark and save it in the partition fetch state object mentioned before; and then call the doTruncate() method to perform the actual log truncation operation. After all the corresponding operations have been performed for the given set of partitions, the code updates the fetch state of these partitions.

The doTruncate() method calls an abstract method truncate, which is implemented in the ReplicaFetcherThread. We will talk about it in detail later. As for the updateFetchOffsetAndMaybeMarkTruncationComplete() method, it is a private method with only a dozen lines of code. I will leave it as an exercise for you to think about its purpose.

After explaining the maybeTruncate() method, let’s take a look at the maybeFetch() method, the code is as follows:

private def maybeFetch(): Unit = {
  val fetchRequestOpt = inLock(partitionMapLock) {
    // Construct FetchRequest for the partitions in partitionStates
    // partitionStates contains the partitions to fetch messages from and their corresponding states
    val ResultWithPartitions(fetchRequestOpt, partitionsWithError) = 
      buildFetch(partitionStates.partitionStateMap.asScala)
    // Handle partitions with errors, mainly by adding them to the end of the ordered map
    // for future retries
    handlePartitionsWithErrors(partitionsWithError, "maybeFetch")
    // If there are no active partitions, wait for fetchBackOffMs time for future retries
    if (fetchRequestOpt.isEmpty) {
      trace(s"There are no active partitions. Back off for $fetchBackOffMs ms before sending a fetch request")
      partitionMapCond.await(fetchBackOffMs, TimeUnit.MILLISECONDS)
    }
    fetchRequestOpt
  }
  // Send FETCH requests to leader replicas and process the responses
  fetchRequestOpt.foreach { case ReplicaFetch(sessionPartitions, fetchRequest) =>
    processFetchRequest(sessionPartitions, fetchRequest)
  }
}

Similarly, the maybeFetch() method can be divided into 3 steps:

Step 1: Construct a FetchRequest object (or more precisely, a FetchRequest.Builder object) for the partitions in the partitionStates. After constructing the Builder object, calling its build() method creates the required FetchRequest object.

Here, the partitionStates contains a set of partitions to fetch messages from and their corresponding state information. The output of this step is two objects:

  • One object is the ReplicaFetch, which contains the core information of the partitions to read and the FetchRequest.Builder object. The core information refers to which partition to read from, where to start reading, the maximum number of bytes to read, etc.
  • The other object is a set of partitions with errors. Step 2: Processing the Error Partition

In this step, the error partition is handled by adding it to the end of the ordered map for later retry. If there are no readable partitions at the moment, the code will block and wait for a certain period of time.

Step 3: Sending FETCH Request and Processing Response

The processFetchRequest method is the most complex method among all the methods in the AbstractFetcherThread class, and it contains a lot of code. To better understand the method, I have extracted the essential code and added comments to each key step:

private def processFetchRequest(sessionPartitions: 
  util.Map[TopicPartition, FetchRequest.PartitionData],
  fetchRequest: FetchRequest.Builder): Unit = {
    val partitionsWithError = mutable.Set[TopicPartition]()
    var responseData: Map[TopicPartition, FetchData] = Map.empty
    try {
      trace(s"Sending fetch request $fetchRequest")
      // Send FETCH request to the leader
      responseData = fetchFromLeader(fetchRequest)
    } catch {
    	...
    }
    // Update the request rate metric
    fetcherStats.requestRate.mark()
    if (responseData.nonEmpty) {
      inLock(partitionMapLock) {
        responseData.foreach { case (topicPartition, partitionData) =>
          Option(partitionStates.stateValue(topicPartition)).foreach { currentFetchState =>
            // Get the core information of the partition
            val fetchPartitionData = sessionPartitions.get(topicPartition)
            // Conditions for processing the response:
            // 1. The offset value to be fetched is equal to the previously saved next offset value
            // 2. The current partition is in a fetchable state
            if (fetchPartitionData != null && fetchPartitionData.fetchOffset == currentFetchState.fetchOffset && currentFetchState.isReadyForFetch) {
              // Extract the Leader Epoch value from the response
              val requestEpoch = if (fetchPartitionData.currentLeaderEpoch.isPresent) Some(fetchPartitionData.currentLeaderEpoch.get().toInt) else None
              partitionData.error match {
                // If there is no error
                case Errors.NONE =>
                  try {
                    // Delegate the response processing to the subclass
                    val logAppendInfoOpt = processPartitionData(topicPartition, currentFetchState.fetchOffset,
                      partitionData)
                    logAppendInfoOpt.foreach { logAppendInfo =>
                      val validBytes = logAppendInfo.validBytes
                      val nextOffset = if (validBytes > 0) logAppendInfo.lastOffset + 1 else currentFetchState.fetchOffset
                      val lag = Math.max(0L, partitionData.highWatermark - nextOffset)
                      fetcherLagStats.getAndMaybePut(topicPartition).lag = lag
                      if (validBytes > 0 && partitionStates.contains(topicPartition)) {
                        val newFetchState = PartitionFetchState(nextOffset, Some(lag), currentFetchState.currentLeaderEpoch, state = Fetching)
                        // Place the partition at the end of the ordered map for fair reading
                        partitionStates.updateAndMoveToEnd(
                          topicPartition, newFetchState)
                        fetcherStats.byteRate.mark(validBytes)
                      }
                    }
                  } catch {
                  	...
                  }
                // If the fetched offset is out of range, usually due to a leader change
                case Errors.OFFSET_OUT_OF_RANGE =>
                  // Adjust the out-of-range offset, usually by truncation
                  if (handleOutOfRangeError(topicPartition, currentFetchState, requestEpoch))
                    // If it still fails, add the partition to the error partition list
                    partitionsWithError += topicPartition
                // If the Leader Epoch value is newer than the epoch value on the Leader broker
                case Errors.UNKNOWN_LEADER_EPOCH =>
                  debug(s"Remote broker has a smaller leader epoch for partition $topicPartition than " +
                    s"this replica's current leader epoch of ${currentFetchState.currentLeaderEpoch}.")
                  // Add the partition to the error partition list
                  partitionsWithError += topicPartition
                // If the Leader Epoch value is older than the epoch value on the Leader broker
                case Errors.FENCED_LEADER_EPOCH =>
                  if (onPartitionFenced(topicPartition, requestEpoch)) partitionsWithError += topicPartition
                // If a leader change has occurred
                case Errors.NOT_LEADER_FOR_PARTITION =>
                  debug(s"Remote broker is not the leader for partition $topicPartition, which could indicate " +
                    "that the partition is being moved")
                  // Add the partition to the error partition list
                  partitionsWithError += topicPartition
                case _ =>
                  error(s"Error for partition $topicPartition at offset ${currentFetchState.fetchOffset}",
                    partitionData.error.exception)
                  // Add the partition to the error partition list
                  partitionsWithError += topicPartition
              }
            }
          }
        }
      }
    }
    if (partitionsWithError.nonEmpty) {
      // Handle the error partition list
      handlePartitionsWithErrors(partitionsWithError, "processFetchRequest")
    }
}

To help you remember, let me explain the core logic of processFetchRequest and the execution process of the fetch thread.

We can divide this logic into the following three parts.

Step 1: Calling fetchFromLeader method to send FETCH request to the Leader, and blocking and waiting for the response. Then updating the fetch request rate monitoring metric.

Step 2: After receiving the response, the code extracts the core information of the partition from the response and compares the offset value to be read with the next offset value cached by the AbstractFetcherThread thread for this partition. It also checks whether the current partition is in a fetchable state.

If these two conditions are not met, it means that the request may be a request that has been waiting for a long time and should not be processed.

On the contrary, if these two conditions are met and there is no error in the response, the code extracts the Leader Epoch value from the response. Then it delegates the specific response processing to the subclass by calling the processPartitionData method. Afterwards, the partition is placed at the end of the ordered map to ensure fairness. If the response has an error, the code calls the customized error handling logic and adds the partition to the error partition list.

Step 3: Calling the handlePartitionsWithErrors method to handle the partitions with errors that occurred during the previous step.

Subclass: ReplicaFetcherThread #

With this, we have completed the study of the AbstractFetcherThread class. Next, let’s take a look at the ReplicaFetcherThread subclass used on the follower replica side.

As mentioned earlier, ReplicaFetcherThread inherits from AbstractFetcherThread. ReplicaFetcherThread is a thread created on the follower replica side, used to fetch message data from the leader replica. We will continue to study the source code of this subclass from the perspectives of class definition and important methods.

The source code of the ReplicaFetcherThread class is located in the same-named Scala file under the server package. This is a small file with more than 300 lines, because most of the processing logic has already been defined in the parent class AbstractFetcherThread.

Class Definition and Fields #

Let’s first study the definition and fields of the ReplicaFetcherThread class:

class ReplicaFetcherThread(name: String,
                           fetcherId: Int,
                           sourceBroker: BrokerEndPoint,
                           brokerConfig: KafkaConfig,
                           failedPartitions: FailedPartitions,
                           replicaMgr: ReplicaManager,
                           metrics: Metrics,
                           time: Time,
                           quota: ReplicaQuota,
                           leaderEndpointBlockingSend: Option[BlockingSend] = None)
  extends AbstractFetcherThread(name = name,
                                clientId = name,
                                sourceBroker = sourceBroker,
                                failedPartitions,
                                fetchBackOffMs = brokerConfig.replicaFetchBackoffMs,
                                isInterruptible = false,
                                replicaMgr.brokerTopicStats) {
  // The Replica ID is the ID of the broker where the replica is located
  private val replicaId = brokerConfig.brokerId
  ......
  // Class for executing request sending
  private val leaderEndpoint = leaderEndpointBlockingSend.getOrElse(
    new ReplicaFetcherBlockingSend(sourceBroker, brokerConfig, metrics, time, fetcherId,
      s"broker-$replicaId-fetcher-$fetcherId", logContext))
  // The maximum wait time for Follower's FETCH requests to be processed and returned
  private val maxWait = brokerConfig.replicaFetchWaitMaxMs
  // The minimum number of bytes that must be accumulated before each FETCH Response is returned
  private val minBytes = brokerConfig.replicaFetchMinBytes
  // The maximum number of bytes for each valid FETCH Response
  private val maxBytes = brokerConfig.replicaFetchResponseMaxBytes
  // The maximum number of bytes that a single partition can retrieve
  private val fetchSize = brokerConfig.replicaFetchMaxBytes
  // Class for maintaining session states by connecting to a broker
  val fetchSessionHandler = new FetchSessionHandler(
    logContext, sourceBroker.id)
  ......
}

The definition code of the ReplicaFetcherThread class is a bit long, but you will find that it is not so difficult to understand, because most of the fields in the constructor have been studied in the previous lesson. Now, we only need to study the few specific fields unique to the ReplicaFetcherThread class.

  • fetcherId: The ID of the follower fetcher thread, which is the thread number. On a single broker, multiple ReplicaFetcherThread threads are allowed. The broker-side parameter num.replica.fetchers determines how many follower fetcher threads Kafka will create.
  • brokerConfig: An instance of the KafkaConfig class. Although we haven’t formally studied its source code, we have encountered it in many code snippets we previously studied. It encapsulates all the broker-side parameter information. Similarly, the ReplicaFetcherThread class uses it to obtain the values of specified broker-side parameters.
  • replicaMgr: The replica manager. This thread class uses the replica manager to obtain partition objects, replica objects, and their corresponding log objects.
  • quota: Used for rate limiting. Rate limiting is an advanced topic. If you want to have a deeper understanding of this part, you can read the source code of the ReplicationQuotaManager class yourself. For now, knowing that when you encounter the term “quota” in the source code next time, it is used to control the speed of follower replica fetching is sufficient.
  • leaderEndpointBlockingSend: This is a class used to implement synchronous request sending. Synchronous sending refers to the thread sending a request to a specified broker and then entering a blocking state until it receives the response returned by the broker.

In addition to the fields defined in the constructor, the ReplicaFetcherThread class also defines 4 fields related to message fetching:

  • maxWait: The maximum wait time for Follower’s FETCH requests to be processed and returned. It is the value of the broker-side parameter replica.fetch.wait.max.ms.
  • minBytes: The minimum number of bytes that must be accumulated before each FETCH Response is returned. It is the value of the broker-side parameter replica.fetch.min.bytes.
  • maxBytes: The maximum number of bytes for each valid FETCH Response. It is the value of the broker-side parameter replica.fetch.response.max.bytes.
  • fetchSize: The maximum number of bytes that a single partition can retrieve. It is the value of the broker-side parameter replica.fetch.max.bytes.

These 4 parameters control the behavior of the follower replica fetching messages from the leader replica, such as how many bytes of data can be obtained in each request or how long the FETCH request should wait if the minimum byte accumulation threshold is not reached.

Important Methods #

Next, let’s continue studying the 3 important methods of the ReplicaFetcherThread class: processPartitionData, buildFetch, and truncate.

Why these 3 methods? Because they represent the most important tasks the follower replica fetcher thread needs to perform: processing fetched messages, constructing fetch requests, and performing log truncation operations.

processPartitionData Method #

Let’s start with the processPartitionData method. After the AbstractFetcherThread thread fetches messages from the leader replica, it needs to call the processPartitionData method to perform subsequent actions. The code of this method is long, but I have added comments to the key steps:

override def processPartitionData(
  topicPartition: TopicPartition,
  fetchOffset: Long,
  partitionData: FetchData): Option[LogAppendInfo] = {
  val logTrace = isTraceEnabled
  // Get the specified topic partition object from the replica manager
  val partition = replicaMgr.nonOfflinePartition(topicPartition).get
  // Get the log object
  val log = partition.localLogOrException
  // Convert the fetched data into a collection of messages that meet the required format
  val records = toMemoryRecords(partitionData.records)
  maybeWarnIfOversizedRecords(records, topicPartition)
  // If the fetch offset value to be read is not the local log's LEO value, it is considered an exceptional case
  if (fetchOffset != log.logEndOffset)
    throw new IllegalStateException("Offset mismatch for partition %s: fetched offset = %d, log end offset = %d.".format(
      topicPartition, fetchOffset, log.logEndOffset))
  if (logTrace)
    trace("Follower has replica log end offset %d for partition %s. Received %d messages and leader hw %d"
      .format(log.logEndOffset, topicPartition, records.sizeInBytes, partitionData.highWatermark))
  // Write to the local log of the follower replica
  val logAppendInfo = partition.appendRecordsToFollowerOrFutureReplica(records, isFuture = false)
  if (logTrace)
    trace("Follower has replica log end offset %d after appending %d bytes of messages for partition %s"
      .format(log.logEndOffset, records.sizeInBytes, topicPartition))
  Some(logAppendInfo)
}

总结 #

在本文中,我们仔细研读了ReplicaFetcherThread类的源码,并详细解释了其中的三个方法:processPartitionData、buildFetch和truncate。processPartitionData方法主要用于将从Leader副本获取到的消息写入到Follower副本的本地日志中,并进行相应的更新操作。buildFetch方法用于构建发送给Leader副本所在Broker的FETCH请求。truncate方法用于对给定分区执行日志截断操作。

通过研读这些源码,我们可以更加深入地了解Kafka中副本同步的工作原理。同时,也能够理解Kafka是如何保证副本之间的数据一致性的。这对于我们理解Kafka的内部机制以及进行故障排查等工作都非常有帮助。

Summary #

Alright, let’s summarize. As I mentioned at the beginning, the doWork method of the AbstractFetcherThread thread connects the three important methods mentioned in the previous lesson together, completing the logic of the fetch thread, which includes log truncation, log fetching, and log processing. The ReplicaFetcherThread class, as a subclass, is where these three methods are actually implemented. In short, the Follower replica uses the ReplicaFetcherThread thread to fetch messages in real-time from the Leader replica and write them to the local log, thus achieving synchronization with the Leader replica. Here are some key points:

  • doWork method: The entry point for the fetch thread’s work, which connects all the important sub functions, such as performing truncation operation, fetching messages from Leader replica, and writing to the local log.
  • truncate method: Performs truncation operation on the local log based on the offset and epoch values returned by the Leader replica.
  • buildFetch method: Builds the data structure required for creating a FetchRequest object for a specific partition group.
  • processPartitionData method: Processes the messages received from the Leader replica, mainly by writing them to the local log.

In fact, the Replica Manager makes multiple appearances in today’s content. If you carefully examine the code, you will find that the Follower replica uses it to obtain the corresponding Partition object, and then relies on this object to perform message writing. So, what other functionalities does the Replica Manager have? I will reveal them one by one in the next lesson.

Post-discussion #

You can go check the source code and tell me what the updateFetchOffsetAndMaybeMarkTruncationComplete method is used for?

Feel free to write your thoughts and answers in the comments section to discuss with me. You are also welcome to share today’s content with your friends.