18 Partition State Machine How Partition State Transitions Are Implemented

18 PartitionStateMachine How Partition State Transitions are Implemented #

Hello, I’m Hu Xi. Today, we will delve into the study of the source code of the PartitionStateMachine.

The PartitionStateMachine is responsible for managing the transition of partition states in Kafka, and it follows the same principles as the ReplicaStateMachine. From the perspective of code structure, implementation, and design principles, the two are very similar. In the previous lesson, we learned about the ReplicaStateMachine, so I believe that learning about the PartitionStateMachine in this lesson will be much easier for you.

During interviews, many interviewers are very interested in asking about leader election strategies. After completing today’s lesson, not only will you be able to describe the four scenarios for leader election, but you will also be able to summarize their commonalities. This will definitely be a bonus point for interviews!

Without further ado, let’s get started.

Introduction to PartitionStateMachine #

The PartitionStateMachine.scala file is located in the controller package and has a simple code structure. You can refer to this mind map:

The code can be divided into 5 main parts.

  • PartitionStateMachine: This is an abstract class for the partition state machine. It defines common methods such as startup and shutdown, and also provides the signature of the handleStateChanges method, which is the entry point for handling partition state transitions.
  • ZkPartitionStateMachine: This is the only subclass of PartitionStateMachine. It implements the main logic of the partition state machine. Similar to ZkReplicaStateMachine, ZkPartitionStateMachine overrides the handleStateChanges method of its parent class and is accompanied by the private doHandleStateChanges method to implement partition state transitions.
  • PartitionState interface and its implementation objects: This defines four types of partition states, namely NewPartition, OnlinePartition, OfflinePartition, and NonExistentPartition. In addition, it defines the transition relationships between them.
  • PartitionLeaderElectionStrategy interface and its implementation objects: This defines four types of partition leader election strategies. You can think of them as four scenarios in which leader election occurs.
  • PartitionLeaderElectionAlgorithms: This is the implementation of partition leader election algorithms. Since four types of election strategies are defined, there must be corresponding implementation code. PartitionLeaderElectionAlgorithms provides the implementation code for these four election strategies.

Class Definitions and Fields #

PartitionStateMachine and ReplicaStateMachine are very similar. Let’s take a look at the following two code snippets:

// Definition of abstract class PartitionStateMachine
abstract class PartitionStateMachine(
  controllerContext: ControllerContext) extends Logging {
  ......
}

// Definition of subclass ZkPartitionStateMachine
class ZkPartitionStateMachine(config: KafkaConfig,
	stateChangeLogger: StateChangeLogger,
	controllerContext: ControllerContext,
	zkClient: KafkaZkClient,
	controllerBrokerRequestBatch: ControllerBrokerRequestBatch) extends PartitionStateMachine(controllerContext) {
  // Id of the Controller located in the Broker
  private val controllerId = config.brokerId
  ......
}

From the code, we can see that their class definitions are exactly the same, especially ZkPartitionStateMachine and ZKReplicaStateMachine. They have the same list of fields. At this point, you should be able to understand their processing logic, which is actually very similar.

Similarly, the creation and startup timing for instances of ZkPartitionStateMachine are exactly the same as ZkReplicaStateMachine. That is, when each Broker process starts, a ZkPartitionStateMachine instance is created during the creation process of the KafkaController object. Only the Broker where the Controller component resides will start the partition state machine.

The following code snippet shows where the ZkPartitionStateMachine instance is created and started:

class KafkaController(......) {
  ......
  // During the creation process of the KafkaController object, a ZkPartitionStateMachine instance is generated
  val partitionStateMachine: PartitionStateMachine = 
    new ZkPartitionStateMachine(config, stateChangeLogger, 
      controllerContext, zkClient, 
      new ControllerBrokerRequestBatch(config, 
      controllerChannelManager, eventManager, controllerContext, 
      stateChangeLogger))
	......
    private def onControllerFailover(): Unit = {
  ......
  replicaStateMachine.startup() // Start the replica state machine
  partitionStateMachine.startup() // Start the partition state machine
  ......
  }
}

Let me re-emphasize: When each Broker starts, the corresponding partition state machine and replica state machine instances are created, but only the Broker where the Controller component resides will start them. If the Controller changes to another Broker, the Broker where the old Controller resides will call the shutdown method of these state machines to shut them down, and the Broker where the new Controller resides will call the startup method to start them.

Partition Status #

Since ZkPartitionStateMachine manages the transition of partition status, we need to know at least what states the partitions can have and what the transition rules defined by Kafka are. This is what the PartitionState interface and its implementation objects are for. Similar to the ReplicaState class, the PartitionState defines the state space and transition rules for partitions.

Taking the OnlinePartition state as an example, let’s explain how the code implements the transitions:

sealed trait PartitionState {
  def state: Byte // State number, no actual usage
  def validPreviousStates: Set[PartitionState] // Set of valid previous states
}

case object OnlinePartition extends PartitionState {
  val state: Byte = 1
  val validPreviousStates: Set[PartitionState] = Set(NewPartition, OnlinePartition, OfflinePartition)
}

As shown in the code, each PartitionState defines a set called validPreviousStates, which represents the set of valid previous states for each state.

For OnlinePartition, its set of valid previous states includes NewPartition, OnlinePartition, and OfflinePartition. In Kafka, any transition from a state outside the set of valid states to the target state will be considered an illegal operation.

Currently, Kafka defines 4 types of states for partitions:

  • NewPartition: The state set for a newly created partition, indicating that it is a brand new partition. Partitions in this state are considered “uninitialized” by Kafka and cannot be elected as leader.
  • OnlinePartition: The state when the partition is actively serving requests.
  • OfflinePartition: The state when the partition is offline.
  • NonExistentPartition: The state when the partition is deleted and removed from the partition state machine.

The following diagram shows the complete set of transition rules for partition states:

Partition State Transition

The double-headed arrows between two states indicate that they can transition to each other, such as OnlinePartition and OfflinePartition. Kafka allows a partition to switch from OnlinePartition to OfflinePartition and vice versa.

In addition, both OnlinePartition and OfflinePartition have an arrow pointing to themselves, indicating that it is allowed to transition from OnlinePartition to OnlinePartition. This can happen when partition leader election occurs. Next, we will discuss the details of partition leader election.

Scenario and Method of Partition Leader Election #

Just now, we mentioned the similarities between two state machines. Next, we will discuss the partition leader election, which can be said to be a unique feature of the PartitionStateMachine.

Each partition must elect a leader in order to provide normal service. Therefore, the leader replica is an important role for the partition. In this case, we must understand the process of leader election and how it is implemented in the code. We will focus on the election strategy and the specific implementation method code.

PartitionLeaderElectionStrategy #

First, let’s clarify the meaning of partition leader election. It is actually quite simple, which is to elect a leader replica for a partition of a Kafka topic.

So, what are the election strategies defined by Kafka, or in what situations does leader election need to be performed?

This is what the PartitionLeaderElectionStrategy interface does. Please see the code below:

// Interface for partition leader election strategies
sealed trait PartitionLeaderElectionStrategy
// Offline partition leader election strategy
final case class OfflinePartitionLeaderElectionStrategy(
  allowUnclean: Boolean) extends PartitionLeaderElectionStrategy
// Reassign partition leader election strategy
final case object ReassignPartitionLeaderElectionStrategy 
  extends PartitionLeaderElectionStrategy
// Preferred replica partition leader election strategy
final case object PreferredReplicaPartitionLeaderElectionStrategy 
  extends PartitionLeaderElectionStrategy
// Controlled shutdown partition leader election strategy
final case object ControlledShutdownPartitionLeaderElectionStrategy 
  extends PartitionLeaderElectionStrategy

Currently, there are 4 scenarios for partition leader election.

  • OfflinePartitionLeaderElectionStrategy: Partition leader election caused by leader replica offline.
  • ReassignPartitionLeaderElectionStrategy: Partition leader election caused by partition replica reassignment operation.
  • PreferredReplicaPartitionLeaderElectionStrategy: Partition leader election caused by preferred replica election.
  • ControlledShutdownPartitionLeaderElectionStrategy: Partition leader election caused by normal broker shutdown.

PartitionLeaderElectionAlgorithms #

For these 4 scenarios, the PartitionLeaderElectionAlgorithms object of the partition state machine defines 4 methods, each responsible for electing a leader replica for each scenario. These 4 methods are:

  • offlinePartitionLeaderElection;
  • reassignPartitionLeaderElection;
  • preferredReplicaPartitionLeaderElection;
  • controlledShutdownPartitionLeaderElection.

The offlinePartitionLeaderElection method is the most complex among these 4 methods, so let’s start with it.

def offlinePartitionLeaderElection(assignment: Seq[Int], 
  isr: Seq[Int], liveReplicas: Set[Int], 
  uncleanLeaderElectionEnabled: Boolean, controllerContext: ControllerContext): Option[Int] = {
  // Find the first ISR replica that is alive in the current replica list
  assignment.find(id => liveReplicas.contains(id) && isr.contains(id)).orElse {
    // If no replica that meets the condition is found, check if unclean leader election is allowed
    // i.e., whether the broker-side parameter unclean.leader.election.enable is true
    if (uncleanLeaderElectionEnabled) {
      // Choose the first alive replica in the current replica list as the leader
      val leaderOpt = assignment.find(liveReplicas.contains)
    if (leaderOpt.isDefined) {
        controllerContext.stats.uncleanLeaderElectionRate.mark()
    }
    leaderOpt
} else {
    None // Return None to indicate that leader election is not allowed for unclean leader election
}

}


I will draw another flowchart to help you understand the code logic:

![](../images/2856c2a7c6dd1818dbdf458c4e409d7c.jpg)

This method accepts a total of 5 parameters. Apart from the familiar ControllerContext class, the other 4 parameters are worth exploring.

**1. assignments**

This is the list of replicas for the partition. This list has a specific name called Assigned Replicas (AR). When we create a topic and use the kafka-topics script to view the topic, we should see a column called Replicas. This column shows the AR for each partition in the topic. The assignments parameter is of type Seq[Int]. **This reveals an important fact: AR has an order and may not be the same as the order of the in-sync replicas (ISR)!**

**2. isr**

ISR is well known in Kafka. It stores the list of replicas for the partition that are in sync with the leader replica. Note that the leader replica itself is also in the ISR. Additionally, as a variable of type Seq[Int], the isr itself has an order.

**3. liveReplicas**

From the name, it can be inferred that this stores all the replicas for the partition that are in a live state. How to determine if a replica is alive? It can be determined based on the data in the Controller metadata cache. In simple terms, all replicas on running Brokers are considered alive.

**4. uncleanLeaderElectionEnabled**

By default, unless the Leader election is initiated by the AdminClient, the value of this parameter is generally false, meaning that Kafka does not allow Unclean Leader Election. Unclean Leader Election refers to the situation where Kafka selects a non-ISR replica as the new Leader when the ISR list is empty. Due to the risk of data loss, the community has currently disabled Unclean Leader Election by setting the default value of the broker-side parameter unclean.leader.election.enable to false.

It is worth mentioning that starting from version 2.4.0.0 onwards, the community officially supports leader election for a given partition from the AdminClient. The current design is that if the Leader election is triggered by the AdminClient, Unclean Leader Election is enabled by default. However, when learning the offlinePartitionLeaderElection method, you can assume uncleanLeaderElectionEnabled=false, as it does not affect your understanding of the method.

After understanding the meaning of these parameters, we can study the specific flow.

The code first sequentially searches the AR list and returns the first replica that satisfies both of the following conditions as the new Leader:

  * The replica is in a live state, i.e., the Broker where the replica is located is still running.
  * The replica is in the ISR list.



If such a replica cannot be found, the code checks if Unclean Leader Election is enabled. If it is enabled, the criteria are lowered, and it only needs to satisfy the first condition mentioned above. If it is not enabled, the Leader election fails and no new Leader is elected.

The other three methods are much simpler, let's directly look at the code:
```scala
def reassignPartitionLeaderElection(reassignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
    reassignment.find(id => liveReplicas.contains(id) && isr.contains(id))
}

def preferredReplicaPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int]): Option[Int] = {
    assignment.headOption.filter(id => liveReplicas.contains(id) && isr.contains(id))
}

def controlledShutdownPartitionLeaderElection(assignment: Seq[Int], isr: Seq[Int], liveReplicas: Set[Int], shuttingDownBrokers: Set[Int]): Option[Int] = {
    assignment.find(id => liveReplicas.contains(id) && isr.contains(id) && !shuttingDownBrokers.contains(id))
}

As you can see, the logic of these methods is almost the same, the basic principle is to find the first live ISR replica from the AR or the given replica list.

By now, you should have a good understanding of how Kafka elects a Leader for a partition. Basically, it finds the first live replica in the AR list (or the given replica list) that is also in the ISR list and selects it as the new Leader.

Methods for Handling Partition State Transitions #

Now that we have learned about the necessary information, let’s take a look at the workings of PartitionStateMachine.

handleStateChanges #

As mentioned earlier, handleStateChanges is the entry method. Let’s first examine its method signature:

def handleStateChanges(
  partitions: Seq[TopicPartition],
  targetState: PartitionState, 
  leaderElectionStrategy: Option[PartitionLeaderElectionStrategy]): 
  Map[TopicPartition, Either[Throwable, LeaderAndIsr]]

To summarize the purpose of handleStateChanges in one sentence: handleStateChanges sets the state of the partitions to targetState and may also use the leaderElectionStrategy to elect a new leader for the partitions, ultimately returning the leader information of the partitions.

In this method, partitions are the target partitions for state changes, targetState is the desired state, and leaderElectionStrategy is an optional argument that signifies whether leader election should be performed.

Here is the complete code of handleStateChanges, with important functionality explained in comments:

override def handleStateChanges(
    partitions: Seq[TopicPartition],
    targetState: PartitionState,
    partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
    if (partitions.nonEmpty) {
      try {
        // Clear the Controller's pending request collection to prepare for request sending
        controllerBrokerRequestBatch.newBatch()
        // Call the doHandleStateChanges method to perform the actual state changes
        val result = doHandleStateChanges(
          partitions,
          targetState,
          partitionLeaderElectionStrategyOpt
        )
        // Notify relevant Brokers of the state changes by sending requests from the Controller
        controllerBrokerRequestBatch.sendRequestsToBrokers(
          controllerContext.epoch)
        // Return the result of the state change handling
        result
      } catch {
        // If the Controller has moved, record the error log and re-throw the exception
        // The upper-level code catches this exception and executes the maybeResign method for resignation logic
        case e: ControllerMovedException =>
          error(s"Controller moved to another broker when moving some partitions to $targetState state", e)
          throw e
        // If it is another exception, record the error log and wrap it in a returned error
        case e: Throwable =>
          error(s"Error while moving some partitions to $targetState state", e)
          partitions.iterator.map(_ -> Left(e)).toMap
      }
    } else { // If partitions is empty, do nothing
      Map.empty
    }
  }

The entire method consists of two steps: The first step is calling the doHandleStateChanges method to execute the partition state transitions. The second step is the Controller notifying the relevant Brokers of the state changes. The Brokers to be notified and the requests to be sent to each Broker are determined in the first step.

Naturally, the focus of this method lies in the doHandleStateChanges method called in the first step.

doHandleStateChanges #

Let’s take a look at the implementation of this method:

private def doHandleStateChanges(
    partitions: Seq[TopicPartition],
    targetState: PartitionState,
    partitionLeaderElectionStrategyOpt: Option[PartitionLeaderElectionStrategy]
  ): Map[TopicPartition, Either[Throwable, LeaderAndIsr]] = {
    val stateChangeLog = stateChangeLogger.withControllerEpoch(controllerContext.epoch)
    val traceEnabled = stateChangeLog.isTraceEnabled
    // Initialize the state of new partitions to NonExistentPartition
    partitions.foreach(partition => controllerContext.putPartitionStateIfNotExists(partition, NonExistentPartition))
    // Find partitions with illegal state transitions and record error logs for these partitions
    val (validPartitions, invalidPartitions) = controllerContext.checkValidPartitionStateChange(partitions, targetState)
    invalidPartitions.foreach(partition => logInvalidTransition(partition, targetState))
    // Enter different case branches based on targetState
    targetState match {
    	......
    }
}

This method first performs state initialization. Specifically, when the method is called, the states of all partitions that are not present in the metadata cache are initialized as NonExistentPartition.

Next, it checks which partitions will have an illegal state transition and records the corresponding error logs for these partitions.

After that, the code enters the case branch based on targetState. Since there are only four partition states, the case branches are much simpler than those in ReplicaStateMachine. Furthermore, only the OnlinePartition branch has relatively complex logic, while the other three branches simply set the partition state to the target state.

Now let’s delve into the OnlinePartition branch, which is the branch when the target state is OnlinePartition:

case OnlinePartition =>
  // Get the list of uninitialized partitions, which are partitions in the NewPartition state
  val uninitializedPartitions = validPartitions.filter(
    partition => partitionState(partition) == NewPartition)
  // Get the list of partitions eligible for leader election
  // Only partitions in the OnlinePartition and OfflinePartition states can participate in leader election
  val partitionsToElectLeader = validPartitions.filter(
    partition => partitionState(partition) == OfflinePartition ||
     partitionState(partition) == OnlinePartition)
  // Initialize partitions in the NewPartition state and write Leader and ISR data to ZooKeeper
  if (uninitializedPartitions.nonEmpty) {
    val successfulInitializations = initializeLeaderAndIsrForPartitions(uninitializedPartitions)
    successfulInitializations.foreach { partition =>
      stateChangeLog.info(s"Changed partition $partition from ${partitionState(partition)} to $targetState with state " +
        s"${controllerContext.partitionLeadershipInfo(partition).leaderAndIsr}")
      controllerContext.putPartitionState(partition, OnlinePartition)
    }
  }
  // Elect leaders for partitions eligible for leader election
  if (partitionsToElectLeader.nonEmpty) {
    val electionResults = electLeaderForPartitions(
      partitionsToElectLeader,
      partitionLeaderElectionStrategyOpt.getOrElse(
        throw new IllegalArgumentException("Election strategy is a required field when the target state is OnlinePartition")
      )
    )
    electionResults.foreach {
      case (partition, Right(leaderAndIsr)) =>
        stateChangeLog.info(
          s"Changed partition $partition from ${partitionState(partition)} to $targetState with state $leaderAndIsr"
        )
        // Set the partition to OnlinePartition state after successfully electing a leader
        controllerContext.putPartitionState(
          partition, OnlinePartition)
      case (_, Left(_)) => // Ignore if leader election fails
    }
    // Return the leader election results
    electionResults
  } else {
    Map.empty
  }

Although the code is a bit long, there are only two main steps.

The first step is to initialize partitions in the NewPartition state. Specifically, Leader and ISR data are created and written to ZooKeeper. The location of the node is /brokers/topics/<topic>/partitions/<partition>, and each node should contain information about the partition’s Leader and ISR. For the specific code, you can refer to the fifth-to-last line of the initializeLeaderAndIsrForPartitions method code snippet:

private def initializeLeaderAndIsrForPartitions(partitions: Seq[TopicPartition]): Seq[TopicPartition] = {
	......
    // Get the list of replicas for each partition
    val replicasPerPartition = partitions.map(partition => partition -> controllerContext.partitionReplicaAssignment(partition))
    // Get the list of live replicas for each partition
    val liveReplicasPerPartition = replicasPerPartition.map { case (partition, replicas) =>
        val liveReplicasForPartition = replicas.filter(replica => controllerContext.isReplicaOnline(replica, partition))
        partition -> liveReplicasForPartition
    }
    // Group partitions into those with live replicas and those without
    // Divide into two groups: partitions with live replicas, and partitions without any live replicas
    val (partitionsWithoutLiveReplicas, partitionsWithLiveReplicas) = liveReplicasPerPartition.partition { case (_, liveReplicas) => liveReplicas.isEmpty }
    ......
    // Determine the leader and ISR for partitions with live replicas
    // Leader is determined based on the first replica in the live replicas list
    // ISR is determined based on all the live replicas
    val leaderIsrAndControllerEpochs = partitionsWithLiveReplicas.map { case (partition, liveReplicas) =>
      val leaderAndIsr = LeaderAndIsr(liveReplicas.head, liveReplicas.toList)
      ......
    }.toMap
    ......
}

Step 2 is to elect a leader for the partitions eligible for leader election. The code calls the electLeaderForPartitions method to achieve this. This method continuously attempts to elect a leader for multiple partitions until all partitions have successfully elected a leader.

The core code for leader election is in the doElectLeaderForPartitions method, which consists of 3 steps.

The code is quite long, so let me first show you a diagram to illustrate the main steps, and then I will explain each step in detail to avoid getting lost in the lengthy source code.

It seems like a long diagram, but don’t worry, we’ll break it down step by step.

As mentioned earlier, this method can be divided into 3 steps. The first step is to fetch the leader and ISR information for the given partitions from ZooKeeper and store the results in a container called validLeaderAndIsrs. Here is the code:

// First part of the doElectLeaderForPartitions method
val getDataResponses = try {
  // Fetch the znode data for the given partitions from ZooKeeper in batches
  zkClient.getTopicPartitionStatesRaw(partitions)
} catch {
  case e: Exception =>
    return (partitions.iterator.map(_ -> Left(e)).toMap, Seq.empty)
}
// Create two containers to store the partitions eligible for leader election and the failed partitions
val failedElections = mutable.Map.empty[TopicPartition, Either[Exception, LeaderAndIsr]]
val validLeaderAndIsrs = mutable.Buffer.empty[(TopicPartition, LeaderAndIsr)]
// Iterate through the znode data for each partition
getDataResponses.foreach { getDataResponse =>
  val partition = getDataResponse.ctx.get.asInstanceOf[TopicPartition]
  val currState = partitionState(partition)
  // If znode data is successfully fetched
  if (getDataResponse.resultCode == Code.OK) {
    // Decode the znode data to obtain the leader and ISR information
    TopicPartitionStateZNode.decode(getDataResponse.data, getDataResponse.stat) match {
      // If the znode data contains leader and ISR information
      case Some(leaderIsrAndControllerEpoch) =>
        // If the Controller Epoch value in the znode data is greater than the current Controller Epoch value
        if (leaderIsrAndControllerEpoch.controllerEpoch > controllerContext.epoch) {
          val failMsg = s"Aborted leader election for partition $partition since the LeaderAndIsr path was " +
            s"already written by another controller. This probably means that the current controller $controllerId went through " +
            s"a soft failure and another controller was elected with epoch ${leaderIsrAndControllerEpoch.controllerEpoch}."
          // Add the partition to the failed elections list
          failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
        } else {
          // Add the partition to the partitions eligible for leader election list
          validLeaderAndIsrs += partition -> leaderIsrAndControllerEpoch.leaderAndIsr
        }
      // If the znode data does not contain leader and ISR information
      case None =>
        val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
        // Add the partition to the failed elections list
        failedElections.put(partition, Left(exception))
    }
  // If the znode data is not fetched, add the partition to the failed elections list
  } else if (getDataResponse.resultCode == Code.NONODE) {
    val exception = new StateChangeFailedException(s"LeaderAndIsr information doesn't exist for partition $partition in $currState state")
    failedElections.put(partition, Left(exception))
  } else {
    failedElections.put(partition, Left(getDataResponse.resultException.get))
  }
}

if (validLeaderAndIsrs.isEmpty) {
  return (failedElections.toMap, Seq.empty)
}

First, the code fetches the znode data for the given partitions from ZooKeeper. Then, two containers are created to store the partitions eligible for leader election and the failed partitions. Next, it iterates through the znode data for each partition. If the znode data is successfully fetched and contains leader and ISR information, and the Controller Epoch value in the znode data is smaller than the current Controller Epoch value, the partition is added to the partitions eligible for leader election list. However, if the Controller Epoch value in the znode data is greater than the current epoch, it means that another Controller has already written the LeaderAndIsr path, so the leader election aborts for that partition, and the partition is added to the failed elections list.

If the znode data does not contain leader and ISR information or the znode data is not fetched, the partition is also added to the failed elections list. Finally, if there are no partitions eligible for leader election, the method returns the failed elections list and an empty sequence. After traversing these partitions, the code needs to check whether the validLeaderAndIsrs container contains partitions eligible for leader election. If there are no partitions that satisfy the leader election criteria, the method returns directly. At this point, the first major step of the doElectLeaderForPartitions method is completed.

Next, let’s look at the code for the second part of the method:

// doElectLeaderForPartitions method - Part 2
// Start leader election and partition partitions based on whether they have a leader or not
val (partitionsWithoutLeaders, partitionsWithLeaders) = 
  partitionLeaderElectionStrategy match {
  case OfflinePartitionLeaderElectionStrategy(allowUnclean) =>
    val partitionsWithUncleanLeaderElectionState = collectUncleanLeaderElectionState(
      validLeaderAndIsrs,
      allowUnclean
    )
    // Elect leader for OfflinePartition partitions
    leaderForOffline(controllerContext, partitionsWithUncleanLeaderElectionState).partition(_.leaderAndIsr.isEmpty)
  case ReassignPartitionLeaderElectionStrategy =>
    // Elect leader for reassigned partitions
    leaderForReassign(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
  case PreferredReplicaPartitionLeaderElectionStrategy =>
    // Elect leader for PreferredReplica partitions
    leaderForPreferredReplica(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
  case ControlledShutdownPartitionLeaderElectionStrategy =>
    // Elect leader for partitions affected by a normal broker shutdown
    leaderForControlledShutdown(controllerContext, validLeaderAndIsrs).partition(_.leaderAndIsr.isEmpty)
}

In this step, depending on the given PartitionLeaderElectionStrategy, different methods of PartitionLeaderElectionAlgorithms are called to perform leader election, and the partitions with and without leaders are distinguished.

As mentioned earlier, these four different strategies define four exclusive methods for leader election. In essence, the rule for selecting the leader is to select the first replica in the replica set that is alive and in the ISR (In-Sync Replicas) as the leader.

Now let’s look at the last part of this method, which mainly updates the ZooKeeper node data and the metadata cache information on the controller side:

// doElectLeaderForPartitions method - Part 3
// Add all partitions that failed to elect a leader to the leader election failure partition list
partitionsWithoutLeaders.foreach { electionResult =>
  val partition = electionResult.topicPartition
  val failMsg = s"Failed to elect leader for partition $partition under strategy $partitionLeaderElectionStrategy"
  failedElections.put(partition, Left(new StateChangeFailedException(failMsg)))
}
val recipientsPerPartition = partitionsWithLeaders.map(result => result.topicPartition -> result.liveReplicas).toMap
val adjustedLeaderAndIsrs = partitionsWithLeaders.map(result => result.topicPartition -> result.leaderAndIsr.get).toMap
// Update the Znode node data of partitions on ZooKeeper with the new elected leader and ISR information
val UpdateLeaderAndIsrResult(finishedUpdates, updatesToRetry) = zkClient.updateLeaderAndIsr(
  adjustedLeaderAndIsrs, controllerContext.epoch, controllerContext.epochZkVersion)
// For partitions that have successfully updated the ZooKeeper Znode node data, encapsulate the corresponding leader and ISR information
// Build the LeaderAndIsr request and add it to the Controller's pending request batch
// Wait for unified sending in the future
finishedUpdates.foreach { case (partition, result) =>
  result.foreach { leaderAndIsr =>
    val replicaAssignment =controllerContext.partitionFullReplicaAssignment(partition)
    val leaderIsrAndControllerEpoch = LeaderIsrAndControllerEpoch(leaderAndIsr, controllerContext.epoch)
    controllerContext.partitionLeadershipInfo.put(partition, leaderIsrAndControllerEpoch)
    controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(recipientsPerPartition(partition), partition,
      leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)
  }
}
// Return the election results, including partitions that have been successfully elected and updated on ZooKeeper,
// failed partitions, and partitions with failed ZooKeeper node update
(finishedUpdates ++ failedElections, updatesToRetry)

First, all partitions that failed to elect a leader in the previous step are added to the leader election failure partition list.

Then, the new elected leader and ISR information is used to update the ZooKeeper node data for the partitions. For the partitions whose ZooKeeper node data has been successfully updated, the source code encapsulates the corresponding leader and ISR information, builds the LeaderAndIsr request, and adds it to the controller’s pending request batch for unified sending in the future.

Finally, the method returns the election results, including partitions that have been successfully elected and updated on ZooKeeper, failed partitions, and partitions with failed ZooKeeper node update.

By the way, do you still remember the second step of the handleStateChanges method, where the controller sends requests to the relevant brokers? Well, this step determines which brokers to send which requests to, and it is accomplished by this line of code:

controllerBrokerRequestBatch.addLeaderAndIsrRequestForBrokers(
  recipientsPerPartition(partition), partition,
  leaderIsrAndControllerEpoch, replicaAssignment, isNew = false)

Summary #

Today, we focused on the source code of the PartitionStateMachine.scala file, mainly studying the construction principle and working mechanism of the Kafka partition state machine.

Now, let’s answer the interviewer’s question again, which should not be difficult. We now know that Kafka currently provides four leader election strategies, namely leader election after partition offline, leader election during replica reassignment, leader election for preferred replicas, and partition leader election when a broker goes offline.

These four types of election strategies have similar logic when selecting a leader, which is to select the first live replica in the in-sync replica (ISR) set of the current replica ordered set as the new leader. Of course, there may be minor differences in some election strategies. You can combine the source code we learned today to further study the source code of each strategy.

Let’s review the key points of this lesson.

  • PartitionStateMachine is the partition state machine defined in the Kafka Controller. It is responsible for defining, maintaining, and managing valid partition state transitions.
  • Each broker instantiates a partition state machine object when it starts up, but only the broker where the Controller is located will start it.
  • There are four types of states for Kafka partitions: NewPartition, OnlinePartition, OfflinePartition, and NonExistentPartition. OnlinePartition is the state when the partition is working normally. NewPartition is the uninitialized state, and partitions in this state do not have the qualification to elect a leader.
  • There are four scenarios for leader election: Offline, Reassign, Preferred Leader Election, and ControlledShutdown. Each scenario corresponds to a specific leader election strategy.
  • The handleStateChanges method is the main entry point, which calls the doHandleStateChanges private method to implement the actual leader election function.

In the next module, we will enter the world of Kafka’s delayed operation code. There, you will learn how Kafka handles a delayed request. In addition, an O(N) time complexity time wheel algorithm is also waiting for us there, and we will study it together!

After-class discussion #

There is a method called triggerOnlineStateChangeForPartitions in the source code. Please analyze what it does and when it is called.

Feel free to express your thoughts and discuss with me in the comment section. You are also welcome to share today’s content with your friends.