23 Replica Manager Understanding the Definitions and Core Fields of the Replica Management Class

23 ReplicaManager Understanding the Definitions and Core Fields of the Replica Management Class #

Hello, I’m Hu Xi.

Today, we are going to learn about the Replica Manager in Kafka. It is responsible for managing and operating the replicas of brokers in the cluster and also handles some partition management tasks, such as changing the replica log directory for an entire partition.

You may recall that when we discussed the state machine, I mentioned that Kafka implements both replica state machines and partition state machines. However, for the manager, the Kafka source code does not define a class specifically for partitions, such as a “Partition Manager” class, but only defines the ReplicaManager class. This class not only implements replica management, but also includes many methods for operating partition objects.

The source code of the ReplicaManager class is very important; it is one of the key components in building the Kafka replica synchronization mechanism. Most of the issues that arise during the replica synchronization process are difficult to locate and resolve. Therefore, mastering this part of the source code will help us explore the root causes of production environment problems and prevent future pitfalls. Now, let me share a real case with you.

Our team encountered a strange issue: after executing a message deletion operation in the production environment system, it caused an inconsistency problem between the follower replica and the leader replica. When we encountered this issue, we were perplexed initially. In normal circumstances, when the leader replica executes the message deletion, the log start offset should be updated, and the follower replica should also update its log start offset. However, in this case, the update failed for the follower replica. We searched through all the logs but still couldn’t find the reason. In the end, we found the answer by analyzing the source code of the ReplicaManager class.

Let’s take a look at the detailed error message of this issue:

Caused by: org.apache.kafka.common.errors.OffsetOutOfRangeException: Cannot increment the log start offset to 22786435 of partition XXX-12 since it is larger than the high watermark 22786129

This is the exception thrown by the follower replica, while the log of the corresponding leader replica shows everything is normal. The following log shows that the log start offset of the leader replica has been successfully adjusted.

INFO Incrementing log start offset of partition XXX-12 to 22786435

When encountering this issue, I believe your first reaction, like mine, would be to consider it a bug, but without certainty about the cause. Later, as we followed the KafkaApis path, we found the deleteRecords method in the ReplicaManager, which gave us some clue.

After the follower replica fetches the message from the leader replica, it performs two actions:

  1. Writes the message to its local log.
  2. Updates the follower replica’s high watermark value and log start offset.

If the deleteRecords operation occurs between these two steps, because deleteRecords changes the log start offset, the follower replica may use an outdated value when performing the second step, resulting in the aforementioned error. It is indeed a bug. After confirming this, the subsequent solution becomes apparent: although the deleteRecords functionality is convenient, due to this bug, we should try to avoid using it directly in the production environment.

Speaking of this, I want to say that encountering real-life issues in the production environment is not scary; what is scary is not being able to locate the root cause of the problem. If you have worked on Java projects, you must have experienced this. Many times, relying solely on the stack exception information is not enough to locate the problem. Especially when it comes to Kafka replica synchronization, it is difficult to understand the underlying principles by just looking at the output logs. Therefore, we must rely on the source code, which is the main purpose of studying the ReplicaManager class today.

Next, we will focus on learning about this class. It is located in the same-named Scala file under the server package. It is a large file with nearly 1900 lines, and the code structure inside it is quite detailed.

Because replica read/write operations and management operations are heavyweight features, before delving into the details, we must clarify the relationships between the structures of the ReplicaManager class and understand the class definition as well as its core fields. This is the main objective of our lesson in this section.

In the next two lessons, I will explain the replica read/write operations and replica management operations in detail. After learning these, you will have a clear and in-depth understanding of the main source code of the ReplicaManager class. Most importantly, you will understand the logic to be executed when replicas become leaders or followers, which will help you tackle actual replica operation issues.

Code Structure #

Let’s first take a look at the code structure of this Scala file. I’ll show you with a mind map:

code_structure

Although there are 8 sections in this file from the code structure, it is easier to understand if the HostedPartition interface and its implementation object are placed together. So, I divided ReplicaManager.scala into 7 main parts.

  • ReplicaManager class: It is the implementation code for the replica manager, which defines methods for reading, writing, and deleting replica messages, as well as other management methods.
  • ReplicaManager object: The companion object of the ReplicaManager class, which only defines 3 constants.
  • HostedPartition and its implementation object: It represents the state of the partition objects stored locally on the broker. Possible states include: non-existent state (None), online state (Online), and offline state (Offline).
  • FetchPartitionData: It defines the fetched partition data and important metadata information, such as high watermark value, log start offset value, etc.
  • LogReadResult: It represents the message data and related metadata information, such as high watermark value and log start offset value, that the replica manager reads from the replica local log.
  • LogDeleteRecordsResult: It represents the result information returned by the replica manager after performing deletion operation on the replica log.
  • LogAppendResult: It represents the result information returned by the replica manager after performing write operation on the replica log.

From a semantic perspective, FetchPartitionData and LogReadResult are very similar, but what is the difference between them?

In fact, the difference between them is very small. If you look into the source code, you will find that the FetchPartitionData class has a total of 8 fields, and the first 7 fields of constructing a FetchPartitionData instance are assigned using the fields of LogReadResult. You can roughly consider that they serve similar purposes. The only difference is that FetchPartitionData has an additional field indicating whether the partition is being reassigned. If so, specific JMX monitoring metrics need to be updated. This is the main difference between these two classes.

Among these 7 parts, the ReplicaManager class is our focus of study. Other classes either only define constants or serve as POJO classes for data storage, and their purposes are self-evident, so I won’t go into detail about them.

Definition of the ReplicaManager class #

Next, we will start learning from the dimensions of Replica class definition and important fields. First, let’s look at the definition of the ReplicaManager class.

class ReplicaManager(
  val config: KafkaConfig,  // Configuration management class
  metrics: Metrics,  // Monitoring metrics class
  time: Time,  // Timer class
  val zkClient: KafkaZkClient,  // ZooKeeper client
  scheduler: Scheduler,   // Kafka scheduler
  val isShuttingDown: AtomicBoolean,  // Whether it is already shutting down
  quotaManagers: QuotaManagers,  // Quota managers
  val brokerTopicStats: BrokerTopicStats,  // Broker topic monitoring metrics class
  val metadataCache: MetadataCache,  // Broker metadata cache
  logDirFailureChannel: LogDirFailureChannel,
  // Purgatory for handling delayed PRODUCE requests
  val delayedProducePurgatory: DelayedOperationPurgatory[DelayedProduce],
  // Purgatory for handling delayed FETCH requests
  val delayedFetchPurgatory: DelayedOperationPurgatory[DelayedFetch],
  // Purgatory for handling delayed DELETE_RECORDS requests
  val delayedDeleteRecordsPurgatory: DelayedOperationPurgatory[DelayedDeleteRecords],
  // Purgatory for handling delayed ELECT_LEADERS requests
  val delayedElectLeaderPurgatory: DelayedOperationPurgatory[DelayedElectLeader],
  threadNamePrefix: Option[String]) extends Logging with KafkaMetricsGroup {
  ......
}          

The ReplicaManager class has a lot of fields in its constructor. Some fields have simple meanings, like time and metrics, which you can understand at a glance, so I won’t go into much detail about them. I will explain a few key fields in detail. These fields are an important basis for understanding the replica manager.

1. logManager

This is the log manager. It is responsible for creating and managing log objects for partitions and defines many methods for operating on log objects, such as getOrCreateLog, etc.

2. metadataCache

This is the metadata cache on the broker side, which stores the information of leaders, ISRs, and other partitions in the cluster. Note that it is related to the metadata cache on the controller side that we mentioned earlier. The metadata cache on each broker is asynchronously synchronized from the metadata cache on the controller.

3. logDirFailureChannel

This is the handler class for handling failed log directories. Starting from Kafka version 1.1, support for JBOD was added. This means that if a broker is configured with multiple log directories, the broker can continue to work when one of the log directories becomes unavailable (e.g., the disk where the directory is located is full). Therefore, a whole set of mechanisms is needed to ensure that the replicas on the functioning disks can continue to provide services in the event of disk I/O failures.

Among them, logDirFailureChannel is a manager class that temporarily stores failed log directories. We don’t need to delve into the source code of this feature, but you should at least know that this feature is an improvement that enhances the high availability of the Kafka server. With this feature, even if a single disk on a broker fails, the entire broker’s services will not be interrupted.

4. Four fields related to Purgatory

These four fields are delayedProducePurgatory, delayedFetchPurgatory, delayedDeleteRecordsPurgatory, and delayedElectLeaderPurgatory, which are responsible for managing four types of delayed requests. The first two types should be familiar to us, as they handle delayed produce requests and delayed fetch requests from consumers respectively. The last two types handle delayed delete records requests and delayed leader election requests, which are more advanced use cases (which we can temporarily ignore).

During the replica management process, most of the changes in states will trigger the handling of delayed requests. In these cases, these Purgatory fields come into play.

Once you understand these fields, you can deal with the subsequent replica management operations. Among them, the most important one is logManager. It is a key component that assists the replica manager in operating on cluster replica objects.

Important Custom Fields #

After learning about class definitions, let’s take a look at the important custom fields in the ReplicaManager class. There are about 20 such fields, and we don’t need to spend time studying each one separately. Fields like isrExpandRate and isrShrinkRate can be understood just by their names - they are monitoring indicators for measuring ISR changes.

Next, I will explain in detail several fields that are crucial for understanding the Replica Manager. I will explain their meanings with the help of code examples and also clarify their important uses.

controllerEpoch #

Let’s start by looking at the controllerEpoch field.

The purpose of this field is to isolate requests sent by expired controllers. This means that requests sent by an old controller cannot be further processed. To distinguish between requests sent by an old controller and those sent by a new controller, we look at the value of controllerEpoch carried by the request. Here is the code defining the field:

@volatile var controllerEpoch: Int = 
  KafkaController.InitialControllerEpoch

This field represents the most recent epoch value of the controller that changed the partition leader. Its default value is 0. Every time the controller changes, the value of this field is incremented by 1.

In the code of ReplicaManager, this field is used in many places to determine the legitimacy of control class requests sent by the controller. If the controllerEpoch value carried by the request is less than the value of this field, it means that the request was issued by an old controller. Therefore, the ReplicaManager directly rejects the processing of such requests.

It is worth noting that it is declared as a var type, which means its value can be dynamically modified. When the ReplicaManager handles control class requests, it updates this field. You can see it in the following code examples:

// becomeLeaderOrFollower method
// Handling LeaderAndIsrRequest request
controllerEpoch = leaderAndIsrRequest.controllerEpoch
// stopReplicas method
// Handling StopReplicaRequest request
this.controllerEpoch = controllerEpoch
// maybeUpdateMetadataCache method
// Handling UpdateMetadataRequest request
controllerEpoch = updateMetadataRequest.controllerEpoch

All requests received by a broker are processed by Kafka I/O threads, and there may be multiple I/O threads. Therefore, the controllerEpoch field is declared as volatile to ensure its memory visibility.

allPartitions #

The next important field is allPartitions. As I mentioned at the beginning of this lesson, Kafka does not have a partition manager as such. The ReplicaManager class undertakes some of the partition management tasks. The allPartitions field contains all the partition objects saved on the broker. The code defining it is as follows:

private val allPartitions = new Pool[TopicPartition, HostedPartition](
  valueFactory = Some(tp => HostedPartition.Online(Partition(tp, time, this)))
)

From the code, you can see that allPartitions is a container for Partition objects. Here, HostedPartition represents the partition state. allPartitions initializes all partition objects as being in the online state.

It is worth noting that the partition state here and the states in the partition state machine we discussed earlier belong to two separate “leaderships”. In the future, they may have the possibility of being merged. After all, their functions have overlapping areas and their meanings are also similar. For example, they both define the Online state, which actually represents the normal working state of a partition. Of course, this is just a bold speculation I made based on the functionality of the source code. Whether or not they will be merged remains to be seen.

One more thing to mention is that the Partition class represents a partition object. An instance of Partition defines and manages an individual partition, mainly utilizing the logManager to help with operations on the underlying log of the partition. The ReplicaManager class manages partitions through the Partition objects.

replicaFetcherManager #

The third important field is replicaFetcherManager. Its main task is to create an instance of the ReplicaFetcherThread class. In the previous lesson, we learned about the source code of the ReplicaFetcherThread class, whose main responsibility is to help the follower replicas pull messages from the leader replica and write them to the local log.

The following code shows the main method createFetcherThread of the ReplicaFetcherManager class:

override def createFetcherThread(fetcherId: Int, sourceBroker: BrokerEndPoint): ReplicaFetcherThread = {
  val prefix = threadNamePrefix.map(tp => s"$tp:").getOrElse("")
  val threadName = s"${prefix}ReplicaFetcherThread-$fetcherId-${sourceBroker.id}"
  // Create an instance of ReplicaFetcherThread and return it
  new ReplicaFetcherThread(threadName, fetcherId, sourceBroker, brokerConfig, failedPartitions, replicaManager,
    metrics, time, quotaManager)
}

The main purpose of this method is to create an instance of ReplicaFetcherThread for use by follower replicas. The name of the thread is determined based on the fetcherId and Broker ID. The ReplicaManager class manages all Fetcher threads, including their creation, startup, addition, stopping, and removal, using the replicaFetcherManager field.

Summary #

In this lesson, I mainly introduced the definition of the ReplicaManager class and its important fields. They serve as the foundation for understanding the replica management functionality of the ReplicaManager class.

In summary, the ReplicaManager class is an important component on the Kafka Broker side that manages partition and replica objects. Each Broker creates an instance of the ReplicaManager class when it starts up. Once created, this instance takes on the responsibility of managing the leader and follower replicas under it.

Let’s briefly review the key points of this lesson.

  • ReplicaManager class: The concrete implementation of the replica manager, which defines methods for reading and writing replicas, deleting replica messages, and other management methods.
  • allPartitions field: Holds the data of all partition objects stored on the Broker. The ReplicaManager class manages replicas under partitions through this field.
  • replicaFetcherManager field: Creates an instance of the ReplicaFetcherThread class, which implements the logic for follower replicas to fetch messages from leader replicas in real time.

Replica Manager

Today, I mentioned multiple times that the ReplicaManager is responsible for replica management. In fact, the two important functions in replica management are reading replica objects and writing replica objects. For leader replicas, follower replicas need to read their message data; for follower replicas, after receiving messages from leader replicas, they need to write the messages to their own underlying logs. So, what is the mechanism for reading and writing replicas? In the next lesson, we will explore in depth the important replica reading and writing methods of the ReplicaManager class.

Post-discussion #

There is a method called offlinePartitionCount in the ReplicaManager class, which counts the number of partitions in the offline state. Can you write a method to count the number of partitions in the online state?

Feel free to share your thoughts and answers in the comments section, discuss with me, and also share today’s content with your friends.