17 Replica State Machine Uncovering the Principles Behind Replica State Machines

17 ReplicaStateMachine Uncovering the Principles Behind Replica State Machines #

Hello, I’m Hu Xi. Today we are going to talk about the Replica State Machine.

In the previous lessons, when discussing the Controller and TopicDeletionManager, I repeatedly mentioned two components: the Replica State Machine and the Partition State Machine. Now you should know that they respectively manage the state transitions of all replicas and partitions in the Kafka cluster. However, do you know exactly what states replicas and partitions can have?

With this question in mind, we will focus on studying the source code of these two components over the next two lessons. Let’s start with the Replica State Machine (ReplicaStateMachine).

Introduction before class #

To be frank, ReplicaStateMachine is not as well-known as the previous components. There is no description of it in the Kafka official documentation, which indicates that it is an internal component that ordinary users cannot perceive. Therefore, many people may have the misconception that since it is an invisible external component, there is no need to study its implementation code.

Actually, that’s not true. Understanding the principle of the replica state machine is helpful for us to fundamentally locate many data inconsistency issues. Now, let me share with you a real experience of mine.

Once, we deployed a 3-broker Kafka cluster with version 2.0.0. Let’s assume that these three brokers are A, B, and C, and we created a single partition with two replicas on these three brokers.

At that time, we found a strange phenomenon: if the two replicas are located on A and B respectively, with the Controller on C, then when we close A and B, ZooKeeper will display that the leader of this topic is -1 and ISR is empty. However, if the two replicas are still on A and B, with the Controller on B, when we close A and B one by one, the leader and ISR of this topic in ZooKeeper will become B. This is obviously different from the previous situation.

Although this is not a particularly serious problem, there is still data inconsistency after all, so it needs to be treated with caution. After carefully reviewing the source code, we found the cause of the inconsistency: it turns out that in the first situation, the Controller will call the ReplicaStateMachine to adjust the state of the replicas of this topic, thereby changing the leader and ISR. However, in the second situation, the Controller performs a failover, but it does not perform state transition during the initialization of the new Controller component, resulting in inconsistency.

You see, if we don’t read this part of the source code, we will certainly not be able to locate the cause of this problem. In short, the replica state machine code defines the set of states for Kafka replicas and controls the transition rules between these states. For you who want to deeply understand the internal principles, the compact and powerful code of ReplicaStateMachine is absolutely not to be missed.

Definition and Initialization #

Today, the source code file we are going to focus on is ReplicaStateMachine.scala in the controller package. Its code structure is very simple, as shown in the following figure:

In this source file of less than 500 lines, the code defines three parts.

  • ReplicaStateMachine: An abstract class for replica state machine, which defines some common methods (such as startup, shutdown) and the most important processing logic method handleStateChanges of the state machine.
  • ZkReplicaStateMachine: The concrete implementation class of replica state machine, which overrides the handleStateChanges method to implement state transitions between replicas. Currently, ZkReplicaStateMachine is the only subclass of ReplicaStateMachine.
  • ReplicaState: A collection of replica states. Kafka currently defines 7 types of replica states.

Now, let’s take a look at how ReplicaStateMachine and its subclass ZkReplicaStateMachine are defined in the code. Please refer to the following code snippets:

// Definition of abstract class ReplicaStateMachine
abstract class ReplicaStateMachine(controllerContext: ControllerContext) extends Logging {
  ......
}

// Definition of concrete class ZkReplicaStateMachine
class ZkReplicaStateMachine(config: KafkaConfig, 
  stateChangeLogger: StateChangeLogger,
  controllerContext: ControllerContext,
  zkClient: KafkaZkClient,
  controllerBrokerRequestBatch: ControllerBrokerRequestBatch) 
  extends ReplicaStateMachine(controllerContext) with Logging {
  ......
}

ReplicaStateMachine only needs to receive an instance of ControllerContext. As I have mentioned repeatedly in the previous lessons, ControllerContext encapsulates all the cluster metadata information stored on the Controller side.

ZkReplicaStateMachine, on the other hand, has more properties. When constructing an instance of ZkReplicaStateMachine, besides an instance of ControllerContext, the important properties are the KafkaZkClient object instance and the ControllerBrokerRequestBatch instance. The former is responsible for interacting with ZooKeeper, while the latter is used to send control requests (LeaderAndIsrRequest, StopReplicaRequest, and UpdateMetadataRequest) to the cluster Brokers, as we discussed in detail in [Lesson 12].

The source code of ControllerBrokerRequestBatch is located in ControllerChannelManager.scala, which is a class with only 10 lines of code. Its main function is to send the given Request to the specified Broker. You can explore how it sends requests yourself (hint: consider the ControllerBrokerStateInfo code we discussed in [Lesson 12]).

In the logic of replica state transition operations, an important step is to update the information for replicas on the Broker, which is accomplished by sending requests from the Controller to the Broker. Therefore, it would be helpful for you to understand the logic of request sending here.

Okay, now that we have learned about the definition of the replica state machine class, let’s see when the replica state machine is initialized.

In summary, when the KafkaController object is constructed, it will initialize an instance of ZkReplicaStateMachine, as shown in the following code:

val replicaStateMachine: ReplicaStateMachine = new   
  ZkReplicaStateMachine(config, stateChangeLogger, 
    controllerContext, zkClient,
    new ControllerBrokerRequestBatch(config, controllerChannelManager, eventManager, controllerContext, stateChangeLogger))

You may ask, “If a Broker is not elected as the Controller, will it also create an instance of KafkaController object?” Yes! All Brokers create a KafkaController instance when starting up, and therefore also create an instance of ZkReplicaStateMachine.

The fact that each Broker creates these instances does not mean that each Broker will start the replica state machine. In fact, the replica state machine will only be started on the Broker where the Controller is located. The specific startup code is located in the onControllerFailover method of KafkaController, as shown below:

private def onControllerFailover(): Unit = {
	......
	replicaStateMachine.startup() // Start the replica state machine
    partitionStateMachine.startup() // Start the partition state machine
    ......
}

When a Broker is successfully elected as the Controller, the onControllerFailover method is called, which in turn starts the replica state machine and partition state machine that have already been created on that Broker.

Replica Status and Status Management Process #

Once the replica state machine is activated, it means that it is about to perform its most important role: managing the transitions of replica states.

However, before learning how to manage the states, we need to understand the current states and their respective meanings. The ReplicaState defined in the source code defines seven different replica states.

  • NewReplica: The state after the replica is created.
  • OnlineReplica: The state when the replica is providing services normally.
  • OfflineReplica: The state when the replica is offline.
  • ReplicaDeletionStarted: The state when the replica deletion process is started.
  • ReplicaDeletionSuccessful: The state after the replica is successfully deleted.
  • ReplicaDeletionIneligible: The state when replica deletion is enabled but the replica cannot be deleted temporarily.
  • NonExistentReplica: The state before the replica is removed from the replica state machine.

In terms of code, the ReplicaState interface and its implementation objects define the number of each state and the valid previous states. Let’s take the OnlineReplica code as an example:

// ReplicaState interface
sealed trait ReplicaState {
  def state: Byte
  def validPreviousStates: Set[ReplicaState] // defines the valid previous states
}

// OnlineReplica state
case object OnlineReplica extends ReplicaState {
  val state: Byte = 2
  val validPreviousStates: Set[ReplicaState] = Set(NewReplica, OnlineReplica, OfflineReplica, ReplicaDeletionIneligible)
}

The validPreviousStates attribute of OnlineReplica is a collection that includes NewReplica, OnlineReplica, OfflineReplica, and ReplicaDeletionIneligible. This means that Kafka only allows the replica to transition from these four states to the OnlineReplica state. Transitioning from the ReplicaDeletionStarted state to the OnlineReplica state is an invalid state transition.

Here, I only listed OnlineReplica as an example. In fact, the code logic for the other six replica states is similar. Since it is relatively simple, I will not introduce each one. You can explore it yourself by comparing it with the source code, and pay attention to the validPreviousStates field of these states to see which states are considered valid previous states for each state.

For your convenience, I have summarized the state transition rules in a diagram:

The one-way arrows in the diagram indicate that only one-way state transitions are allowed, while the two-way arrows indicate that the transition direction can be bidirectional. For example, there is a bidirectional arrow between OnlineReplica and OfflineReplica, which means that the replica can switch freely between the OnlineReplica and OfflineReplica states.

Combining this diagram, I will further explain the meanings of each state and their transition processes.

After the replica object is created for the first time, it is placed in the NewReplica state. After initialization, when the replica object is ready to provide services, the state machine will transition it to the OnlineReplica state, and it will continue to work in this state.

If the broker where the replica is located is shut down or cannot work properly for other reasons, the replica needs to transition from the OnlineReplica state to the OfflineReplica state, indicating that the replica is offline.

Once operations such as topic deletion are enabled, the state machine will transition the replica state to ReplicaDeletionStarted, indicating that the replica deletion process has started. If the deletion is successful, it will be changed to ReplicaDeletionSuccessful; if the deletion conditions are not met (such as the broker being offline), it will be set to ReplicaDeletionIneligible for future retries.

When the replica object is deleted, its state will be changed to NonExistentReplica, and the replica state machine will remove the replica data.

This is a basic status management process.

Implementation Class: ZkReplicaStateMachine #

After understanding these states, let’s take a look at the implementation class of the replica state machine, ZkReplicaStateMachine.

This class defines one public method and seven private methods. The public method is the most important logic handling code of the replica state machine, which is the handleStateChanges method. The other seven methods are all used to assist the public method.

Definition of State Transition Methods #

Before diving into the handleStateChanges method, let me briefly introduce what the other seven methods are used for. As mentioned before, these methods mainly serve as helpers. Only by understanding the purpose of these methods can you better understand the implementation logic of handleStateChanges.

  • logFailedStateChange: Records an error log indicating an invalid state change.
  • logInvalidTransition: Also records an error log, indicating an illegal state transition.
  • logSuccessfulTransition: Records a successful state transition.
  • getTopicPartitionStatesFromZk: Retrieves the state information of the specified partition from ZooKeeper, including the leader replica and ISR set of each partition.
  • doRemoveReplicasFromIsr: Removes the given replicas from the ISR (in-sync replicas) of the given partition object.
  • removeReplicasFromIsr: Invokes the doRemoveReplicasFromIsr method to remove the given replicas from the ISR of the given partition.
  • doHandleStateChanges: The main method for performing state changes and transitions. We will study its source code in detail next.

handleStateChanges method #

The handleStateChanges method is responsible for handling state changes and serves as the entry point for state transition operations provided externally. Its method signature is as follows:

def handleStateChanges(replicas: Seq[PartitionAndReplica], targetState: ReplicaState): Unit

This method takes two parameters: ‘replicas’, which is a set of replica objects, each encapsulating the topic, partition, and broker ID data of the replicas; ’targetState’, which is the target state to which this set of replica objects should be transitioned.

The complete code of this method is as follows:

override def handleStateChanges(
  replicas: Seq[PartitionAndReplica], 
  targetState: ReplicaState): Unit = {
  if (replicas.nonEmpty) {
    try {
      // Clear the Controller's pending request collection
      controllerBrokerRequestBatch.newBatch()
      // Group all replica objects by Broker and perform state transitions in order
      replicas.groupBy(_.replica).foreach {
        case (replicaId, replicas) =>
          doHandleStateChanges(replicaId, replicas, targetState)
      }
      // Send the corresponding Controller requests to the Brokers
      controllerBrokerRequestBatch.sendRequestsToBrokers(
        controllerContext.epoch)
    } catch {
      // If the Controller has changed, log the error and throw an exception
      case e: ControllerMovedException =>
        error(s"Controller moved to another broker when moving some replicas to $targetState state", e)
        throw e
      case e: Throwable => error(s"Error while moving some replicas to $targetState state", e)
    }
  }
}

The code logic can be divided into two steps: the first step is to call the doHandleStateChanges method to perform the actual replica state transitions; and the second step is to send requests in batches to the corresponding brokers in the cluster.

When performing the first step, the code groups the replicas by Broker ID.

For example, if we use <topic name, partition number, replica broker ID> to represent a replica object, assuming replicas is a collection of replicas, such as {<topic1, 0, 0>, <topic1, 0, 1>, <topic1, 1, 0>, <topic1, 1, 1>}, the code will group the replicas by Broker ID before calling the doHandleStateChanges method, resulting in: Map(0 -> Set(<topic1, 0, 0>, <topic1, 1, 0>), 1 -> Set(<topic1, 0, 1>, <topic1, 1, 1>)).

After completing all these steps, the code starts calling the doHandleStateChanges method to perform state transitions. Although this method looks long, it is actually composed of different code branches.

doHandleStateChanges method #

Let me first use a diagram to illustrate its flow, and then analyze its code in detail:

From the graph, we can see that in step 1 of the code, it tries to obtain the current state of the given replica object in the metadata cache on the Controller side. If the state of a replica object is not saved, the code initializes it as the NonExistentReplica state.

In step 2, the code divides the given replica object set into two parts based on the valid preceding state collection defined in different ReplicaStates and the target state (targetState) passed in: the set of replica objects that can undergo a legal transition, and the set of replica objects that will perform an illegal state transition. The doHandleStateChanges method logs an error message for each replica object in the latter set.

In step 3, the code enters different branches with the set of replica objects that can undergo legal transitions. Since Kafka currently defines 7 types of replica states, there are a total of 7 branches in the code.

I will provide a detailed explanation of the most common state transition paths, including the transition to the NewReplica state when a replica is created, the transition to the OnlineReplica state when a replica is working normally, and the transition to the OfflineReplica state when a replica is stopped. As for the remaining code, you can study them on your own after class, as their transition operation principles are roughly the same.

Path 1: Transition to the NewReplica state #

First, let’s take a look at Path 1, which is the code where the target state is NewReplica. The code is as follows:

case NewReplica =>
  // Iterate through all replica objects that can undergo the transition
  validReplicas.foreach { replica =>
    // Get the partition object of the replica, i.e., <topic name, partition number>
    val partition = replica.topicPartition
    // Get the current state of the replica object
    val currentState = controllerContext.replicaState(replica)
    // Try to retrieve the current partition information from the metadata cache,
    // including which replica is the leader and which replicas are in the ISR, among other data
    controllerContext.partitionLeadershipInfo.get(partition) match {
      // If the partition data information is successfully obtained
      case Some(leaderIsrAndControllerEpoch) =>
        // If the replica is the leader replica
        if (leaderIsrAndControllerEpoch.leaderAndIsr.leader == replicaId) {
          val exception = new StateChangeFailedException(s"Replica $replicaId for partition $partition cannot be moved to NewReplica state as it is being requested to become leader")
          // Log the error message. Leader replicas cannot be set to the NewReplica state
          logFailedStateChange(replica, currentState, OfflineReplica, exception)
        // Otherwise, send LeaderAndIsrRequest to the broker where the replica is located
        // to synchronize the data of the partition, and then send UpdateMetadataRequest
        // to all brokers in the current cluster to notify them of the partition data change
        } else {
          controllerBrokerRequestBatch
            .addLeaderAndIsrRequestForBrokers(
              Seq(replicaId),
              replica.topicPartition,
              leaderIsrAndControllerEpoch,
              controllerContext.partitionFullReplicaAssignment(
                replica.topicPartition),
              isNew = true)
          if (traceEnabled)
            logSuccessfulTransition(
              stateLogger, replicaId, 
              partition, currentState, NewReplica)
          // Update the current state of the replica object to NewReplica in the metadata cache
          controllerContext.putReplicaState(replica, NewReplica)
        }
      // If there is no corresponding data
      case None =>
        if (traceEnabled)
          logSuccessfulTransition(
            stateLogger, replicaId, 
            partition, currentState, NewReplica)
        // Only update the current state of the replica object to NewReplica in the metadata cache
        controllerContext.putReplicaState(replica, NewReplica)
    }
  }

After reading the code, you can take another look at this flowchart:

This path mainly tries to obtain the partition information data of these replica objects from the metadata cache, including which broker the leader replica of the partition is located on, and which replicas are in the ISR, among other data.

If the corresponding partition data cannot be found, the replica state is updated directly to NewReplica. Otherwise, the code needs to send a request to the broker where the replica is located, so that it knows the information of the partition. At the same time, the code also needs to send a request to all running brokers in the cluster to let them know about the new replica joining. Next, let’s take a look at the second case, which is to convert the replica object to OnlineReplica.

As I mentioned earlier, this is the state in which the replica object works normally. Let’s see what the source code does to transition to this state:

case OnlineReplica =>
  validReplicas.foreach { replica =>
    // Get the partition where the replica is located
    val partition = replica.topicPartition
    // Get the current state of the replica
    val currentState = controllerContext.replicaState(replica)
    currentState match {
      // If the current state is NewReplica
      case NewReplica =>
        // Get the partition replica list from the metadata cache
        val assignment = controllerContext
          .partitionFullReplicaAssignment(partition)
        // If the replica list does not contain the current replica, it is considered an abnormal situation
        if (!assignment.replicas.contains(replicaId)) {
          error(s"Adding replica ($replicaId) that is not part of the assignment $assignment")
          // Add the replica to the replica list, and update the replica list in the metadata cache
          val newAssignment = assignment.copy(
            replicas = assignment.replicas :+ replicaId)
          controllerContext.updatePartitionFullReplicaAssignment(
            partition, newAssignment)
        }
      // If the current state is other states
      case _ =>
        // Try to get the current information data of the partition
        controllerContext.partitionLeadershipInfo
          .get(partition) match {
          // If the partition information exists
          // Send a request to the replica object's Broker to synchronize the partition data
          case Some(leaderIsrAndControllerEpoch) =>
            controllerBrokerRequestBatch
              .addLeaderAndIsrRequestForBrokers(Seq(replicaId),
                replica.topicPartition,
                leaderIsrAndControllerEpoch,
                controllerContext
                  .partitionFullReplicaAssignment(partition), 
                isNew = false)
          case None =>
        }
    }
    if (traceEnabled)
      logSuccessfulTransition(
        stateLogger, replicaId, 
        partition, currentState, OnlineReplica)
    // Set the replica object to OnlineReplica state
    controllerContext.putReplicaState(replica, OnlineReplica)
  }

I also used a diagram to illustrate:

The code will still iterate over the replica objects and perform the following steps:

  • Step 1: Get the partition object that the replica belongs to in the metadata, as well as the current state of the replica.
  • Step 2: Check if the current state is NewReplica. If it is, get the replica list of the partition and check if the replica is not already in the current replica list. If it is not, log an error and update the replica list in the metadata. If the state is not NewReplica, it means that this is an existing replica object. The source code will then get the detailed data of the corresponding partition and send a LeaderAndIsrRequest to the Broker where the replica object is located to synchronize the partition data.
  • Step 3: Change the state of the replica object to OnlineReplica. At this point, the replica is in the normal working state.

Third case: Transition to OfflineReplica state #

Lastly, let’s take a look at the third case branch, which is to transition the replica object to the OfflineReplica state. I will provide the main code logic in the form of code comments:

case OfflineReplica =>
  validReplicas.foreach { replica =>
    // Send StopReplicaRequest to the Broker where the replica resides, stopping the corresponding replica
    controllerBrokerRequestBatch
      .addStopReplicaRequestForBrokers(Seq(replicaId), 
        replica.topicPartition, deletePartition = false)
  }
  // Split the collection of replica objects into two sets: replicas with leader information and replicas without leader information
  val (replicasWithLeadershipInfo, replicasWithoutLeadershipInfo) = 
    validReplicas.partition { replica =>
      controllerContext.partitionLeadershipInfo
        .contains(replica.topicPartition)
    }
  // For replicas with leader information,
  // remove the replica objects from all corresponding partitions and update ZooKeeper nodes
  val updatedLeaderIsrAndControllerEpochs = 
    removeReplicasFromIsr(replicaId,  
      replicasWithLeadershipInfo.map(_.topicPartition))
  // Iterate through each updated partition information
  updatedLeaderIsrAndControllerEpochs.foreach {
    case (partition, leaderIsrAndControllerEpoch) =>
      stateLogger.info(s"Partition $partition state changed to $leaderIsrAndControllerEpoch after removing replica $replicaId from the ISR as part of transition to $OfflineReplica")
      // If the partition's topic has not been flagged for deletion
      if (!controllerContext.isTopicQueuedUpForDeletion(
        partition.topic)) {
        // Get the Brokers where the other replicas of this partition are located except for the given replica  
        val recipients = controllerContext
          .partitionReplicaAssignment(partition)
          .filterNot(_ == replicaId)
        // Send requests to these Brokers to update the LeaderAndIsr data for the newly updated partition
        controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
          recipients,
          partition,
          leaderIsrAndControllerEpoch,
          controllerContext.partitionFullReplicaAssignment(partition), 
          isNew = false)
      }
      val replica = PartitionAndReplica(partition, replicaId)
      val currentState = controllerContext.replicaState(replica)
      if (traceEnabled)
        logSuccessfulTransition(stateLogger, replicaId, 
          partition, currentState, OfflineReplica)
      // Set the state of the given replica in the partition to OfflineReplica
      controllerContext.putReplicaState(replica, OfflineReplica)
  }
  // Iterate through all replicas without leader information
  replicasWithoutLeadershipInfo.foreach { replica =>
    val currentState = controllerContext.replicaState(replica)
    if (traceEnabled)
      logSuccessfulTransition(stateLogger, replicaId, 
        replica.topicPartition, currentState, OfflineReplica)
     // Send requests to all Brokers in the cluster, updating the corresponding partition metadata
    controllerBrokerRequestBatch.addUpdateMetadataRequestForBrokers(
      controllerContext.liveOrShuttingDownBrokerIds.toSeq,
      Set(replica.topicPartition))
    // Set the state of the given replica in the partition to OfflineReplica
    controllerContext.putReplicaState(replica, OfflineReplica)
  }

I will continue to use a diagram to illustrate its execution process:

First, the code sends a StopReplicaRequest to the Broker where the replica resides, explicitly telling the Broker to stop the corresponding replica. The ReplicaManager component of Kafka is responsible for handling this logic. We will discuss the implementation of ReplicaManager in two separate lessons, but for now, you just need to understand that once the StopReplica request is sent, the corresponding replica on these Brokers stops working.

Next, the code divides the set of replica objects into two subsets: replicas with leader information and replicas without leader information. The presence or absence of leader information not only includes the leader, but also ISR and controllerEpoch data. However, you can roughly assume that the set of replicas is divided based on the presence or absence of a leader.

Then, the code iterates through the subset with leader information, sends LeaderAndIsrRequest to the Brokers where these replicas reside to update the partition information after stopping the replica, and then sets the partition state to OfflineReplica.

Finally, the code iterates through the subset without leader information, performing a similar operation to the previous step. However, since no Leader election operation has been performed for the replicas without leader information, instead of sending LeaderAndIsrRequest to these Brokers, the code sends UpdateMetadataRequest, explicitly informing them to update the metadata for the corresponding partitions, and then sets the replica state to OfflineReplica.

From this description, we can see that the main logic of changing the replica state to OfflineReplica is actually the operation of stopping the corresponding replica + updating the metadata of the remote Broker.

Summary #

Today, we focused on the implementation principle of Kafka’s replica state machine and carefully studied the source code in this part. Let’s briefly review the key points of this lesson.

  • Replica state machine: ReplicaStateMachine is the implementation class in the Kafka broker-side source code that controls the flow of replica states. An instance of ReplicaStateMachine is created when each broker starts, but only the broker where the Controller component is located will start it.
  • Replica state: Currently, Kafka defines 7 types of replica states. At the same time, it also specifies the valid previous states for each type of state.
  • handleStateChanges: The core method for executing state transitions. It calls the doHandleStateChanges method at the underlying level and enumerates the transition logic for each type of state in a 7-way case branch.

In the next lesson, I will guide you through another famous state machine in Kafka: the partition state machine. Once you have mastered these two state machines, you will have a clear understanding of the complete process and means of managing partitions and replica objects on the Kafka broker side. In fact, after understanding these two components, Controller will hardly pose any difficulty to you in terms of its work on topics.

After-class Discussion #

Please try analyzing the code in the last branch of the doHandleStateChanges method.

Feel free to write down your thoughts and answers in the comment section to exchange ideas with me. You are also welcome to share today’s content with your friends.