25 Replica Manager How the Replica Manager Manages Replicas

25 ReplicaManager How the Replica Manager Manages Replicas #

Hello, I’m Hu Xi.

In the last lesson, we learned how the ReplicaManager class in the source code executes replica read and write operations. Now we know that these replica read and write operations are mainly implemented through the appendRecords and fetchMessages methods, which actually call the append and read methods of the Log in the underlying layer, which we learned about in Lesson 3 regarding log message writing and reading methods.

Today, we will continue studying the source code of the ReplicaManager class to see how the ReplicaManager manages replicas. Here, the term “replica” covers various aspects of the broad replica objects, including replica and partition objects, replica offset values, and ISR (in-sync replica) management. Therefore, in this lesson, we will specifically study these aspects in conjunction with the source code.

Partition and Replica Management #

In addition to reading and writing replicas, the replica manager also has an important function, which is to manage replicas and their corresponding partitions. The ReplicaManager manages them through the allPartitions field.

So, let me review the code of allPartitions from [Lesson 23] with you. However, this time, to emphasize its attribute as a container, we should focus on its characteristic as an object pool. In other words, allPartitions gathers all partition objects together and manages them in a unified object pool.

private val allPartitions = new Pool[TopicPartition, HostedPartition](
  valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, this)))
)

From the code, we can see that each ReplicaManager instance maintains all the partition objects stored on the broker, and each partition object Partition defines a set of replica objects Replica under it. Through this hierarchical relationship, the replica manager indirectly manages the replica objects by directly manipulating the partition objects. We should say that the ReplicaManager indirectly manages the subordinate replica objects by directly manipulating the partition objects.

For a broker, the main way it manages its subordinate partition and replica objects is to determine which ones are the leader replicas and which ones are the follower replicas among the replicas it saves.

These divisions are not fixed, but are constantly changing over time. For example, at one moment, a broker may be the leader replica of partition A and the follower replica of partition B, but at some point in the future, the broker may become the follower replica of partition A and the leader replica of partition B.

And these changes are implemented by the controller sending LeaderAndIsrRequest requests to the broker. When the broker receives such a request, it will call the becomeLeaderOrFollower method of the replica manager to process it, and then execute the logic of “becoming leader replica” and “becoming follower replica” in order, so as to exchange the roles of partition A and B replicas on the current broker.

The becomeLeaderOrFollower Method #

Here we mention the LeaderAndIsrRequest again. In fact, we have mentioned it many times when studying the controller and control class requests. We also learned in detail about its functions in [Lesson 12]. It has been a long time since then, and I’m afraid you have forgotten about it, so let’s review it here.

Simply put, it tells the broker that receives the request: among the partitions I pass to you, which partitions have leader replicas at your end and which partitions have follower replicas at your end.

The becomeLeaderOrFollower method is the specific place to handle the LeaderAndIsrRequest request, and it is also the place where the replica manager adds partitions. Now let’s learn the source code of this method completely. Since this part of the code is quite long, I will divide it into 3 sections to introduce to you, namely processing the Controller Epoch matters, executing the logic of becoming leader and follower, and constructing the response.

First, let’s look at the flowchart of the first section of the becomeLeaderOrFollower method, that is, the process of handling the Controller Epoch and other related preparations:

Since the beginning of the becomeLeaderOrFollower method is a log output only used for debugging, it is not very important. Therefore, I will start explaining from the if statement. The main code of the first section is as follows:

// If the Controller Epoch carried by the LeaderAndIsrRequest
// is smaller than the current Controller's Epoch
if (leaderAndIsrRequest.controllerEpoch < controllerEpoch) {
  stateChangeLogger.warn(s"Ignoring LeaderAndIsr request from controller $controllerId with " +
    s"correlation id $correlationId since its controller epoch ${leaderAndIsrRequest.controllerEpoch} is old. " +
    s"Latest known controller epoch is $controllerEpoch")
  // Indicates that the Controller has changed, throws the corresponding exception
  leaderAndIsrRequest.getErrorResponse(0, Errors.STALE_CONTROLLER_EPOCH.exception)
} else {
  val responseMap = new mutable.HashMap[TopicPartition, Errors]
  // Update the current Controller's Epoch value
  controllerEpoch = leaderAndIsrRequest.controllerEpoch
  val partitionStates = new mutable.HashMap[Partition, LeaderAndIsrPartitionState]()
  // Traverse all partitions in the LeaderAndIsrRequest request
  requestPartitionStates.foreach { partitionState =>
    val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
    // Get the corresponding partition object from allPartitions
    val partitionOpt = getPartition(topicPartition) match {
      // If it is in the Offline state
.val partitionsBecomeLeader = if (partitionsToBeLeader.nonEmpty)
  makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap, highWatermarkCheckpoints)
else
  Map.empty[TopicPartition, LeaderAndIsrResponse.PartitionState]

这里判断了是否有分区需要成为Leader副本,如果有的话,调用makeLeaders方法来处理这些分区。makeLeaders方法的具体实现我们放在下一节学习,这里我们只需要知道它可以将指定的分区成为Leader副本。

接着,代码又判断是否有分区需要成为Follower副本,如果有的话,调用makeFollower方法来处理这些分区。makeFollower方法的具体实现我们也放在下一节学习,这里我们只需要知道它可以将指定的分区成为Follower副本。

最后,将LeaderAndIsrResponse中的所有分区状态信息都加入到responseMap中,当然makeLeaders和makeFollower方法的返回结果也要加到其中。最终,becomeLeaderOrFollower方法返回了responseMap。

恭喜你,现在你已经学习完becomeLeaderOrFollower方法的核心逻辑了!接下来,我们要学习makeLeaders和makeFollower方法的实现,这两个方法有重要的副本状态变更逻辑,希望你能专心听讲。

// Execute the logic of becoming a leader replica
makeLeaders(controllerId, controllerEpoch, partitionsToBeLeader, correlationId, responseMap, highWatermarkCheckpoints)
else
Set.empty[Partition]
val partitionsBecomeFollower = if (partitionsToBeFollower.nonEmpty)
// Call makeFollowers method to make all partitions in partitionsToBeFollower
// become follower replicas
makeFollowers(controllerId, controllerEpoch, partitionsToBeFollower, correlationId, responseMap,
highWatermarkCheckpoints)
else
Set.empty[Partition]
val leaderTopicSet = leaderPartitionsIterator.map(_.topic).toSet
val followerTopicSet = partitionsBecomeFollower.map(_.topic).toSet
// Remove the old leader replica monitoring metrics for the topics
// for which the current broker becomes a follower replica
followerTopicSet.diff(leaderTopicSet).foreach(brokerTopicStats.removeOldLeaderMetrics)
// Remove the old follower replica monitoring metrics for the topics
// for which the current broker becomes a leader replica
leaderTopicSet.diff(followerTopicSet).foreach(brokerTopicStats.removeOldFollowerMetrics)
// If the local log for a partition is empty, it means the underlying log path is not available
// Mark the partition as Offline
leaderAndIsrRequest.partitionStates.forEach { partitionState =>
val topicPartition = new TopicPartition(partitionState.topicName, partitionState.partitionIndex)
if (localLog(topicPartition).isEmpty)
markPartitionOffline(topicPartition)
}

First, this part of the code needs to determine two sets of partitions: one set contains all partitions where this broker becomes a leader, and the other set contains all partitions where this broker becomes a follower. The criterion for determining this is primarily based on the Leader information of the partitions in the LeaderAndIsrRequest request. If the Leader ID is the same as the ID of the current broker, it means that the broker is the leader for that partition; otherwise, it means the current broker is a follower for that partition.

Once these two sets of partitions are determined, the code will call the makeLeaders and makeFollowers methods for each set respectively to make the leader and follower roles effective. Then, for the topics where the current broker becomes a follower replica, the code needs to remove the monitoring metrics for the old leader replicas to avoid resource leaks. Similarly, for the topics where the current broker becomes a leader replica, the code needs to remove the monitoring metrics for the old follower replicas.

Finally, if the local log for a partition is empty, it means that the underlying log path is not available. In this case, the partition is marked as Offline. Marking a partition as Offline mainly consists of two steps: the first step is to update the state of the partition in allPartitions, and the second step is to remove the corresponding monitoring metrics.

To sum up, the main functionality of the second part of the becomeLeaderOrFollower method is to call the makeLeaders and makeFollowers methods to make the broker’s leader or follower roles effective on different partitions. I will explain the details of these two methods in a moment.

Now let’s take a look at the third part of the code, where the response object is constructed. This part of the code is the final operation of the becomeLeaderOrFollower method.

// Start a dedicated thread to periodically write the high watermark value of all non-offline partitions on the broker to checkpoint files
startHighWatermarkCheckPointThread()

// Add log directory fetcher threads to fetch log data
maybeAddLogDirFetchers(partitionStates.keySet, highWatermarkCheckpoints)

// Shut down inactive replica fetcher threads
replicaFetcherManager.shutdownIdleFetcherThreads()

// Shut down inactive log directory fetcher threads
replicaAlterLogDirsManager.shutdownIdleFetcherThreads()

// Execute the callback logic after the leader change
onLeadershipChange(partitionsBecomeLeader, partitionsBecomeFollower)

// Construct the response for the LeaderAndIsrRequest and return it
val responsePartitions = responseMap.iterator.map { case (tp, error) =>
new LeaderAndIsrPartitionError()
controllerId: Controller所在Broker的ID
controllerEpoch: Controller Epoch值,可以认为是Controller版本号
partitionStates: LeaderAndIsrRequest请求中携带的分区信息

其中,controllerId 是 Controller 所在的 Broker 的 ID,每个 Kafka 集群都会有一个 Controller 负责管理分区和副本的 Leader/Follower 角色。在这里,通过 controllerId 参数来获取 Controller 的 Broker 对象。

controllerEpoch 是 Controller 的版本号,用来标识 Controller 状态的变化。

partitionStates 是一个 Map,其中键是分区对象,值是 LeaderAndIsrPartitionState 对象,表示分区在 LeaderAndIsrRequest 请求中的状态信息。

接下来,我们看一下 makeLeaders 的实现逻辑。

private def makeLeaders(controllerId: Int,
    controllerEpoch: Int,
    partitionStates: Map[Partition, LeaderAndIsrPartitionState],
    correlationId: Int,
    responseMap: mutable.Map[TopicPartition, Errors],
    highWatermarkCheckpoints: OffsetCheckpoints): Set[Partition] = {
  
  ..........
  
  // Step 1: 停掉这些分区对应的获取线程
  replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet)

  // Step 2: 更新 Broker 缓存中的分区元数据信息
  val partitionsTobeLeader = partitionStates.keySet.filter(partition => 
    partitionStates(partition).leader == brokerId && partitionStates(partition).leaderEpoch > replicaManager.localBrokerIdAndEpoch.brokerEpoch)

  replicaManager.makeLeaders(controllerId, controllerEpoch, partitionStates, correlationId, highWatermarkCheckpoints)

  ..........
}

首先,代码调用 replicaFetcherManager.removeFetcherForPartitions 方法停止对应分区的拉取线程,保证不会有新的副本数据被拉取。

然后,代码调用 replicaManager.makeLeaders 方法更新 Broker 缓存中的分区元数据信息,并获取需要成为 Leader 的分区集合 partitionsTobeLeader

其中,replicaManager.makeLeaders 方法的实现逻辑是:

def makeLeaders(controllerId: Int,
    controllerEpoch: Int,
    partitionStates: Map[Partition, LeaderAndIsrPartitionState],
    correlationId: Int,
    highWatermarkCheckpoints: OffsetCheckpoints): Unit = {

  // 更新元数据缓存
  updateMetadataCache(partitionStates, correlationId)
  
  // 更新 ISR 分区集合和 Leader 分区集合
  updateLeaderAndIsrCache(partitionStates)
}

updateMetadataCache 方法更新 Broker 缓存中的分区元数据信息。 updateLeaderAndIsrCache 方法更新 ISR(In-Sync Replica) 分区集合和 Leader 分区集合。

最后,代码返回需要成为 Leader 的分区集合 partitionsTobeLeader

综上所述,makeLeaders 方法的实现流程是:

  1. 停止对应分区的拉取线程;
  2. 更新 Broker 缓存中的分区元数据信息;
  3. 获取需要成为 Leader 的分区集合。

下面我们继续看看 makeFollowers 方法是如何实现的。

makeFollowers方法 #

makeFollowers方法的作用是,让当前 Broker 成为给定一组分区的 Follower,也就是让当前 Broker 下该分区的副本成为 Follower 副本。这个方法主要有4步:

  1. 停掉这些分区对应的获取线程;
  2. 更新 Broker 缓存中的分区元数据信息;
  3. 更新 Follower 副本集合和 Leader 副本集合;
  4. 触发副本拉取,开始从 Leader 副本拉取数据。

我们结合代码分析下这些都是如何实现的。首先,我们看下 makeFollowers 的方法签名:

private def makeFollowers(controllerId: Int,
    controllerEpoch: Int,
    partitionStates: Map[Partition, LeaderAndIsrPartitionState],
    correlationId: Int,
    responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {
    ..........
}

可以看出,makeFollowers 方法接收5个参数,并返回一个分区对象的集合。这个集合就是当前 Broker 是 Follower 的所有分区。

在这 5 个参数中,以下 3 个参数比较关键,我们看下它们的含义。

controllerId: Controller 所在 Broker  ID
controllerEpoch: Controller Epoch 可以认为是 Controller 版本号
partitionStates: LeaderAndIsrRequest 请求中携带的分区信息

其中,controllerId 是 Controller 所在的 Broker 的 ID,每个 Kafka 集群都会有一个 Controller 负责管理分区和副本的 Leader/Follower 角色。在这里,通过 controllerId 参数来获取 Controller 的 Broker 对象。

controllerEpoch 是 Controller 的版本号,用来标识 Controller 状态的变化。

partitionStates 是一个 Map,其中键是分区对象,值是 LeaderAndIsrPartitionState 对象,表示分区在 LeaderAndIsrRequest 请求中的状态信息。

接下来,我们看一下 makeFollowers 的实现逻辑。

private def makeFollowers(controllerId: Int,
    controllerEpoch: Int,
    partitionStates: Map[Partition, LeaderAndIsrPartitionState],
    correlationId: Int,
    responseMap: mutable.Map[TopicPartition, Errors]): Set[Partition] = {

  ..........
  
  // Step 1: 停止对应分区的获取线程
  replicaFetcherManager.removeFetcherForPartitions(partitionStates.keySet)

  // Step 2: 更新 Broker 缓存中的分区元数据信息
  val partitionsToBeFollower = partitionStates.keySet -- partitionStates.filter {
    case (partition, stateInfo) =>
      stateInfo.leader == brokerId && stateInfo.leaderEpoch > replicaManager.localBrokerIdAndEpoch.brokerEpoch
  }.keySet

  replicaManager.makeFollowers(controllerId, controllerEpoch, partitionStates, correlationId)

  ..........
}

首先,代码调用 replicaFetcherManager.removeFetcherForPartitions 方法停止对应分区的拉取线程,保证不会有新的副本数据被拉取。

然后,代码调用 replicaManager.makeFollowers 方法更新 Broker 缓存中的分区元数据信息,并获取需要成为 Follower 的分区集合 partitionsToBeFollower

其中,replicaManager.makeFollowers 方法的实现逻辑是:

def makeFollowers(controllerId: Int,
    controllerEpoch: Int,
    partitionStates: Map[Partition, LeaderAndIsrPartitionState],
    correlationId: Int): Unit = {

  // 更新元数据缓存
  updateMetadataCache(partitionStates, correlationId)
  
  // 更新 ISR 分区集合和 Leader 分区集合
  updateFollowerAndLeaderCache(partitionStates)
}

updateMetadataCache 方法更新 Broker 缓存中的分区元数据信息。updateFollowerAndLeaderCache 方法更新 ISR(In-Sync Replica)分区集合和 Leader 分区集合。

最后,代码返回需要成为 Follower 的分区集合 partitionsToBeFollower

综上所述,makeFollowers 方法的实现流程是:

  1. 停止对应分区的拉取线程;
  2. 更新 Broker 缓存中的分区元数据信息;
  3. 获取需要成为 Follower 的分区集合。

至此,我们分析了 becomeLeaderOrFollower 方法的整体执行逻辑以及 makeLeadersmakeFollowers 方法的具体实现。下面我们继续看后续的代码逻辑。

  • controllerId: The ID of the Controller located in the Broker. This field is only used for logging purposes and has no other practical use.
  • controllerEpoch: The Controller Epoch value, which can be considered as the version number of the Controller. This field is used for logging purposes and has no other practical use.
  • partitionStates: The partition information carried in the LeaderAndIsrRequest request, including who the Leader of each partition is, which ISR (In-Sync Replica) nodes are present, and other data.

Okay, now let’s continue learning about the code for makeLeaders. I have put the key steps of this method in comments and removed some code related to logging output.

    ......
    // Initialize ResponseMap with Errors.NONE
    partitionStates.keys.foreach { partition =>
      ......
      responseMap.put(partition.topicPartition, Errors.NONE)
    }
    val partitionsToMakeLeaders = mutable.Set[Partition]()
    try {
      // Stop message fetching
      replicaFetcherManager.removeFetcherForPartitions(
        partitionStates.keySet.map(_.topicPartition))
      stateChangeLogger.info(s"Stopped fetchers as part of LeaderAndIsr request correlationId $correlationId from " +
        s"controller $controllerId epoch $controllerEpoch as part of the become-leader transition for " +
        s"${partitionStates.size} partitions")
      // Update the Leader partition information for the specified partitions
      partitionStates.foreach { case (partition, partitionState) =>
        try {
          if (partition.makeLeader(partitionState, highWatermarkCheckpoints))
            partitionsToMakeLeaders += partition
          else
            ......
        } catch {
          case e: KafkaStorageException =>
            ......
            // Wrap the KAFKA_STORAGE_ERROR exception in the Response
            responseMap.put(partition.topicPartition, Errors.KAFKA_STORAGE_ERROR)
        }
      }
    } catch {
      case e: Throwable =>
        ......
    }
    ......
    partitionsToMakeLeaders

I have summarized the main execution flow into a flowchart:

Combining with this chart, let me explain the execution logic of this method to you.

First, all the partition states in the given set are initialized as Errors.NONE in the ResponseMap.

Then, all fetcher threads serving these partitions are stopped. Since this Broker is now the Leader replica for these partitions, there is no need for the fetcher threads anymore.

Finally, the makeLeaders method calls the makeLeader method of the Partition class to update the Leader partition information for the given set of partitions. This is done in the makeLeader method in the Partition class. This method saves the Leader and ISR (In-Sync Replica) information for the partition, creates the necessary log objects, and resets the LEO (Log-End Offset) value for the remote Follower replicas.

What does “remote Follower replicas” mean? It refers to a group of Follower replicas stored in the Leader replica’s local memory, represented by the remoteReplicas field in the code.

When the ReplicaManager processes FETCH requests, it updates the LEO values of the replica objects in remoteReplicas. Meanwhile, the Leader replica compares its own updated LEO value with the LEO values of the replicas in remoteReplicas to decide whether to “lift” the high watermark value.

One important step in the makeLeader method of the Partition class is to reset the LEO value for this group of remote Follower replicas.

After the makeLeaders method has finished executing the Partition.makeLeader method, if the Broker successfully becomes the Leader replica for the given partition, it returns True to indicate that the new Leader configuration is successful. Otherwise, it indicates that the operation failed. If the Leader is successfully set, the partition is added to the list of partitions where the Leader has been successfully set, and this list is then returned.

That concludes the method. Let me summarize: the purpose of the makeLeaders method is to make the current Broker the Leader replica for the given partitions. Now let’s take a look at the makeFollowers method, which is the opposite of the makeLeaders method.

makeFollowers method #

The purpose of the makeFollowers method is to configure the current Broker as a Follower replica for the specified partitions. Let’s start with the method signature:

    // controllerId: The ID of the Controller located in the Broker.
    // controllerEpoch: The Controller Epoch value.
    // partitionStates: Detailed information about all partitions where the current Broker is a Follower replica.
    // correlationId: The correlation field that associates connection requests with responses.
    // responseMap: The field that encapsulates the processing results of the LeaderAndIsrRequest.
    // highWatermarkCheckpoints: The utility class for manipulating high watermark checkpoint files.
    private def makeFollowers(
      controllerId: Int,
      controllerEpoch: Int,
      partitionStates: Map[Partition, LeaderAndIsrPartitionState],
      correlationId: Int,
      responseMap: mutable.Map[TopicPartition, Errors],
      highWatermarkCheckpoints: OffsetCheckpoints) : Set[Partition] = {
      ......
    }

As you can see, the parameter list of the makeFollowers method is exactly the same as that of the makeLeaders method. I won’t go into detail about it here.

Among the important fields, partitionStates and responseMap are the input and output of the makeFollowers method, respectively.

Since the code for the entire makeFollowers method is quite long, I will first explain the core logic using a diagram to give you an overview. Then, I will guide you through each part of the code according to its functionality.

Overall, the makeFollowers method consists of two major steps:

  1. Traversing all partitions in partitionStates and performing the “become a Follower” operation.
  2. Performing additional actions, including rebuilding Fetcher threads and completing delayed requests.

First, let’s learn about the code for traversing all partitions in partitionStates:

    // Part 1: Traverse all partitions in partitionStates

……

partitionStates.foreach { case (partition, partitionState) => …… // Initialize the state of the processing result of all partitions to Errors.NONE responseMap.put(partition.topicPartition, Errors.NONE) } val partitionsToMakeFollower: mutable.Set[Partition] = mutable.Set() try { // Traverse all partitions in partitionStates partitionStates.foreach { case (partition, partitionState) => // Get the Leader Broker ID of the partition val newLeaderBrokerId = partitionState.leader try { // Find the Leader Broker object in the metadata cache metadataCache.getAliveBrokers.find(.id == newLeaderBrokerId) match { // If the Leader exists case Some() => // Execute the makeFollower method to configure the current Broker as a Follower replica of the partition if (partition.makeFollower(partitionState, highWatermarkCheckpoints)) // If configuration is successful, add the partition to the result set partitionsToMakeFollower += partition else // If it fails, print error logs …… // If the Leader does not exist case None => …… // Create a log object for the partition Follower replica partition.createLogIfNotExists(isNew = partitionState.isNew, isFutureReplica = false, highWatermarkCheckpoints) } } catch { case e: KafkaStorageException => …… } } }

In this part of the code, we can divide the execution logic into two steps.

Step 1: Initialize the processing result state of all partitions in the response set to Errors.NONE. Step 2: Traverse all partitions in partitionStates and execute the following logic for each partition:

  • Retrieve the Leader Broker ID of the partition from the partition’s detailed information.
  • Find the Leader Broker object in the Broker metadata cache using the Broker ID obtained in the previous step.
  • If the Leader object exists, execute the makeFollower method of the Partition class to configure the current Broker as a Follower replica of the partition. If the makeFollower method is successful, add the partition to the result set.
  • If the Leader object does not exist, create a log object for the partition Follower replica.

Regarding the execution logic of the makeFollower method of the Partition class, it mainly includes the following four steps:

  1. Update the Controller Epoch value.
  2. Save the replica list (Assigned Replicas, AR) and clear the ISR.
  3. Create a log object.
  4. Reset the Broker ID of the Leader replica.

Next, let’s take a look at the code that executes other actions in the makeFollowers method:

// Part 2: Execute other actions // Remove existing Fetcher threads replicaFetcherManager.removeFetcherForPartitions( partitionsToMakeFollower.map(.topicPartition)) …… // Attempt to complete delayed fetch or produce requests partitionsToMakeFollower.foreach { partition => completeDelayedFetchOrProduceRequests(partition.topicPartition) } if (isShuttingDown.get()) { ….. } else { // For partitions that need to set the current Broker as a Follower replica // Determine the Leader Broker and the starting fetch offset value (fetchOffset) val partitionsToMakeFollowerWithLeaderAndOffset = partitionsToMakeFollower.map { partition => val leader = metadataCache.getAliveBrokers .find(.id == partition.leaderReplicaIdOpt.get).get .brokerEndPoint(config.interBrokerListenerName) val fetchOffset = partition.localLogOrException.highWatermark partition.topicPartition -> InitialFetchState(leader, partition.getLeaderEpoch, fetchOffset) }.toMap // Add new Fetcher threads using the Leader Broker and fetchOffset determined in the previous step replicaFetcherManager.addFetcherForPartitions( partitionsToMakeFollowerWithLeaderAndOffset) } } catch { case e: Throwable => …… throw e } …… // Return the list of partitions that need to set the current Broker as a Follower replica partitionsToMakeFollower

As you can see, this part of the code has relatively simple tasks and the logic progresses linearly, making it easy to understand. Let’s briefly summarize:

First, remove existing Fetcher threads since the Leader may have changed, resulting in changes to the Broker to read from and the fetch offset value.

Next, for partitions that need to set the current Broker as a Follower replica, determine the Leader Broker and starting fetch offset value (fetchOffset). This information is already available in the LeaderAndIsrRequest.

Then, add new Fetcher threads using the Leader Broker and fetchOffset determined in the previous step.

Finally, return the list of partitions that need to set the current Broker as a Follower replica.

With this, we have covered the implementation of the main methods for managing partitions and replicas. It can be seen that most of the code implementation revolves around how to handle the LeaderAndIsrRequest data. For example, makeLeaders sets the Leader and ISR for partitions based on the request data, while makeFollowers replaces Fetcher threads and clears ISR for partitions based on the data.

ISR Management #

In addition to the functions of reading and writing replicas, managing partitions and replicas, the replica manager also has an important function, which is to manage ISR (In-Sync Replicas). The management here mainly includes two methods:

  • One is the maybeShrinkIsr method, which periodically checks whether the replica set in ISR needs to be shrunk;
  • The other is the maybePropagateIsrChanges method, which periodically propagates changes in ISR to the cluster brokers.

First, let’s take a look at the shrink operation of ISR.

maybeShrinkIsr method #

Shrinking refers to the process of removing replicas from the ISR replica set that have a significant lag compared to the leader replica. The definition of significant lag is when the Follower replicas in the ISR lag behind the Leader replica by more than 1.5 times the value of the replica.lag.time.max.ms parameter on the Broker side.

Wait a minute, why is it 1.5 times? You can see the code below:

def startup(): Unit = {
  scheduler.schedule("isr-expiration", maybeShrinkIsr _, period = config.replicaLagTimeMaxMs / 2, unit = TimeUnit.MILLISECONDS)
  ......
}

Let me explain. When the startup method of the ReplicaManager class is called, an asynchronous thread is created to periodically check if any ISR needs to be shrunk. The frequency of this check is half of the replicaLagTimeMaxMs value, and the condition for removing Follower replicas from the ISR is when the lag exceeds 1.5 times the replicaLagTimeMaxMs value.

Therefore, in theory, Follower replicas with a lag less than 1.5 times the replicaLagTimeMaxMs value can still be in the ISR and will not be removed. This is the reason for the number “1.5”.

Next, let’s take a look at the source code of the maybeShrinkIsr method.

private def maybeShrinkIsr(): Unit = {
  trace("Evaluating ISR list of partitions to see which replicas can be removed from the ISR")
  allPartitions.keys.foreach { topicPartition =>
    nonOfflinePartition(topicPartition).foreach(_.maybeShrinkIsr())
  }
}

As we can see, the maybeShrinkIsr method iterates over all partition objects on this replica manager, and for each partition that is online, it calls the maybeShrinkIsr method of the Partition class. The source code for this method is as follows:

def maybeShrinkIsr(): Unit = {
  // Check if ISR shrink is needed
  val needsIsrUpdate = inReadLock(leaderIsrUpdateLock) {
    needsShrinkIsr()
  }
  val leaderHWIncremented = needsIsrUpdate && inWriteLock(leaderIsrUpdateLock) {
    leaderLogIfLocal match {
      // If it is a Leader replica
      case Some(leaderLog) =>
        // Get the out-of-sync replica Id list
        val outOfSyncReplicaIds = getOutOfSyncReplicas(replicaLagTimeMaxMs)
        // If there are out-of-sync replica Ids
        if (outOfSyncReplicaIds.nonEmpty) {
          // Calculate the updated ISR list after shrink
          val newInSyncReplicaIds = inSyncReplicaIds -- outOfSyncReplicaIds
          assert(newInSyncReplicaIds.nonEmpty)
          info("Shrinking ISR from %s to %s. Leader: (highWatermark: %d, endOffset: %d). Out of sync replicas: %s."
            .format(inSyncReplicaIds.mkString(","),
              newInSyncReplicaIds.mkString(","),
              leaderLog.highWatermark,
              leaderLog.logEndOffset,
              outOfSyncReplicaIds.map { replicaId =>
                s"(brokerId: $replicaId, endOffset: ${getReplicaOrException(replicaId).logEndOffset})"
              }.mkString(" ")
            )
          )
          // Update ISR data of the partition in ZooKeeper and the metadata cache on the Broker
          shrinkIsr(newInSyncReplicaIds)
          // Attempt to increment the high watermark value of the Leader replica
          maybeIncrementLeaderHW(leaderLog)
        } else {
          false
        }
      // If it is not a Leader replica, do nothing
      case None => false
    }
  }
  // If the high watermark value of the Leader replica is incremented
  if (leaderHWIncremented)
    // Try to unlock the delayed requests
    tryCompleteDelayedRequests()
}

As we can see, the entire execution flow of the maybeShrinkIsr method is as follows:

  • Step 1: Check if ISR shrink is needed. The main method is to call the needsShrinkIsr method to get the replicas that are out of sync with the Leader. If there are such replicas, it means that ISR shrink is needed.
  • Step 2: Get the list of replicas that are out of sync with the Leader again and remove them from the current ISR, then calculate the updated ISR list.
  • Step 3: Call the shrinkIsr method to update the ISR data of the partition in ZooKeeper and the metadata cache on the Broker.
  • Step 4: Attempt to increment the high watermark value of the Leader replica. It is necessary to check if the high watermark value can be incremented because if there is only one Leader replica in the ISR after the shrink, the update of the high watermark value is no longer as restricted.
  • Step 5: Based on the result of the previous step, try to unlock the delayed operations that did not meet the conditions.

I have summarized this execution process into a flowchart:

maybePropagateIsrChanges method #

After the shrinkage of ISR, the ReplicaManager also needs to propagate the result of this operation to other brokers in the cluster in order to synchronize the result. This is done through ISR notify events.

In the ReplicaManager class, the maybePropagateIsrChanges method is specifically responsible for creating ISR notify events. This is also done by an asynchronous thread periodically, as shown in the code below:

scheduler.schedule("isr-change-propagation", maybePropagateIsrChanges _, period = 2500L, unit = TimeUnit.MILLISECONDS)

Next, let’s take a look at the code of the maybePropagateIsrChanges method:

def maybePropagateIsrChanges(): Unit = {
  val now = System.currentTimeMillis()
  isrChangeSet synchronized {
    // Conditions for ISR change propagation:
    // 1. There are ISR changes that have not been propagated yet
    // 2. There has been no ISR change in the past 5 seconds or it has been more than 1 minute since the last ISR change
    if (isrChangeSet.nonEmpty &&
      (lastIsrChangeMs.get() + ReplicaManager.IsrChangePropagationBlackOut < now ||
        lastIsrPropagationMs.get() + ReplicaManager.IsrChangePropagationInterval < now)) {
      // Create the corresponding Znode node in ZooKeeper
      zkClient.propagateIsrChanges(isrChangeSet)
      // Clear the isrChangeSet collection
      isrChangeSet.clear()
      // Update the recent ISR change timestamp
      lastIsrPropagationMs.set(now)
    }
  }
}

As we can see, the logic of the maybePropagateIsrChanges method is also quite simple. Let me summarize its execution logic.

First, determine the conditions for ISR change propagation. Both of the following conditions need to be satisfied:

  • There are ISR changes that have not been propagated yet.
  • Either there has been no ISR change in the past 5 seconds, or it has been more than 1 minute since the last ISR change.

Once these two conditions are met, the code creates the corresponding Znode node in ZooKeeper, then clears the isrChangeSet collection, and finally updates the recent ISR change timestamp.

Summary #

Today, we focused on the partition and replica management features of the ReplicaManager class, as well as ISR management. Let’s recap the core features and methods of the ReplicaManager class.

  • Partition/replica management. The core function of the ReplicaManager class is to handle the LeaderAndIsrRequest received by the broker, extract the partition information from the request, and instruct the broker to perform the corresponding actions.
  • becomeLeaderOrFollower method. This is the entry method for handling the LeaderAndIsrRequest. It divides the partitions in the request into two groups and calls the makeLeaders and makeFollowers methods respectively.
  • makeLeaders method. Its purpose is to make the broker become the leader replica of the specified partition.
  • makeFollowers method. Its purpose is to make the broker become the follower replica of the specified partition.
  • ISR management. The ReplicaManager class provides functions for shrinking the ISR and periodically propagating ISR change notifications.

Mastering these core points will give you a good understanding of most of the replica manager’s functionality, such as how the broker becomes the leader replica and follower replica of a partition, and how the ISR is managed.

You may have also noticed that we didn’t go into detail on some non-core features today, such as the metadata cache on the broker. In the next lesson, I will take you deeper into this cache to see what it is exactly.

Post-discussion #

In the source code of maybePropagateIsrChanges, there is a field called isrChangeSet. Do you know where it is being updated?

Feel free to share your thoughts and answers in the comments. Let’s discuss and exchange ideas. You are also welcome to share today’s content with your friends.