15 Understanding the Role of Controller in a Kafka Cluster

15 Understanding the Role of Controller in a Kafka Cluster #

Hello, I am Hu Xi.

In the previous class, we learned about the source code of Controller election, understood the scenarios that trigger Controller election, and how the Controller is elected. Once the Controller is ready, it will exercise its important rights as a controller, including managing cluster members, maintaining topics, operating metadata, and so on.

When I was learning Kafka before, I was always curious about how newly started brokers join the cluster. The explanation in the official documentation is, “Adding servers to a Kafka cluster is easy, just assign them a unique broker id and start up Kafka on your new servers.” Obviously, you just need to start the Broker process to achieve cluster expansion, including the synchronization of cluster metadata information.

However, have you ever thought about how all this is done? In fact, this is an important function provided by the source code of the Controller component: managing new cluster members.

Of course, as a core component, Controller provides many functions. In addition to cluster member management, topic management is also an extremely important function. Today, I will take you to delve into the implementation code of these two functions. It can be said that these are the two most core functions of the Controller, and they involve almost all the important data in the cluster metadata. Once you grasp these, when you explore the other code of Controller in the future, you will be more competent.

Cluster Member Management #

First, let’s take a look at the code for member management in the Controller. Member management includes two aspects:

  1. Management of the number of members, mainly reflected in adding new members and removing existing members.
  2. Management of individual members, such as changing the data of a single Broker.

Managing the Number of Members #

When each Broker starts up, it creates a temporary node under the /brokers/ids node in ZooKeeper with a name equal to the value of the broker.id parameter.

For example, if the broker.id parameter of a Broker is set to 1001, when the Broker starts up, you will observe a child node named 1001 under /brokers/ids in ZooKeeper. The content of this node includes the hostname, port number, and information of the listener configured for the Broker (Note: The listener here is not the same as the ZooKeeper listener mentioned above).

When the Broker shuts down normally or crashes unexpectedly, the corresponding temporary node in ZooKeeper will automatically disappear.

Based on this mechanism of temporary nodes, the Controller defines a BrokerChangeHandler listener, which is responsible for monitoring the number of child nodes under /brokers/ids.

Once a new Broker is added or removed, the number of child nodes under /brokers/ids will definitely change. This will be detected by the Controller, which will then trigger the handleChildChange method of the BrokerChangeHandler.

Below is the code for the BrokerChangeHandler. As you can see, it defines the handleChildChange method:

class BrokerChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  // Broker ZooKeeper ZNode: /brokers/ids 
  override val path: String = BrokerIdsZNode.path
  override def handleChildChange(): Unit = {
    eventManager.put(BrokerChange) // Just write a BrokerChange event to the event queue
  }
}

The purpose of this method is to write a BrokerChange event to the Controller’s event queue. In fact, the processing logic of all the Handlers defined on the Controller side is just to write the corresponding ControllerEvent to the event queue. The actual event processing logic is located in the process method of the KafkaController class.

Next, let’s take a look at the process method. You will find that the method for processing the BrokerChange event is actually processBrokerChange, as shown in the following code:

private def processBrokerChange(): Unit = {
  // If this Broker is not the Controller, it naturally does not have the right to process it, and returns directly
  if (!isActive) return
  // Step 1: Get the cluster Broker list from ZooKeeper
  val curBrokerAndEpochs = zkClient.getAllBrokerAndEpochsInCluster
  val curBrokerIdAndEpochs = curBrokerAndEpochs map { case (broker, epoch) => (broker.id, epoch) }
  val curBrokerIds = curBrokerIdAndEpochs.keySet
  // Step 2: Get the Broker list currently saved by the Controller
  val liveOrShuttingDownBrokerIds = controllerContext.liveOrShuttingDownBrokerIds
  // Step 3: Compare the two lists to obtain the lists of newly added Brokers, Brokers to be removed,
  // rebounced Brokers, and currently running Brokers
  val newBrokerIds = curBrokerIds.diff(liveOrShuttingDownBrokerIds)
  val deadBrokerIds = liveOrShuttingDownBrokerIds.diff(curBrokerIds)
  val bouncedBrokerIds = (curBrokerIds & liveOrShuttingDownBrokerIds)
    .filter(brokerId => curBrokerIdAndEpochs(brokerId) > controllerContext.liveBrokerIdAndEpochs(brokerId))
  val newBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => newBrokerIds.contains(broker.id) }
  val bouncedBrokerAndEpochs = curBrokerAndEpochs.filter { case (broker, _) => bouncedBrokerIds.contains(broker.id) }
  val newBrokerIdsSorted = newBrokerIds.toSeq.sorted
  val deadBrokerIdsSorted = deadBrokerIds.toSeq.sorted
  val liveBrokerIdsSorted = curBrokerIds.toSeq.sorted
  val bouncedBrokerIdsSorted = bouncedBrokerIds.toSeq.sorted
  info(s"Newly added brokers: ${newBrokerIdsSorted.mkString(",")}, " +
    s"deleted brokers: ${deadBrokerIdsSorted.mkString(",")}, " +
    s"bounced brokers: ${bouncedBrokerIdsSorted.mkString(",")}, " +
    s"all live brokers: ${liveBrokerIdsSorted.mkString(",")}")
  // Step 4: Create channel managers and underlying request send threads (RequestSendThread) for each newly added Broker
  newBrokerAndEpochs.keySet.foreach(
    controllerChannelManager.addBroker)
  // Step 5: Remove the existing resources (channel manager, RequestSendThread, etc.) for each bounced Broker and add them again
  bouncedBrokerIds.foreach(controllerChannelManager.removeBroker)
  bouncedBrokerAndEpochs.keySet.foreach(
    controllerChannelManager.addBroker)
  // Step 6: Remove the corresponding resources for each Broker to be removed
  deadBrokerIds.foreach(controllerChannelManager.removeBroker)
  // Step 7: Update the Controller metadata and perform Broker startup logic for each newly added Broker
  if (newBrokerIds.nonEmpty) {
    controllerContext.addLiveBrokers(newBrokerAndEpochs)
    onBrokerStartup(newBrokerIdsSorted)
  }
  // Step 8: Perform re-addition logic for each rebounced Broker, including updating ControllerContext
  // and performing Broker restart logic
  if (bouncedBrokerIds.nonEmpty) {
    controllerContext.removeLiveBrokers(bouncedBrokerIds)
    onBrokerFailure(bouncedBrokerIdsSorted)
    controllerContext.addLiveBrokers(bouncedBrokerAndEpochs)
    onBrokerStartup(bouncedBrokerIdsSorted)
  }
  // Step 9: Perform removal logic for each Broker to be removed in ControllerContext and Broker termination logic
  if (deadBrokerIds.nonEmpty) {
    controllerContext.removeLiveBrokers(deadBrokerIds)
    onBrokerFailure(deadBrokerIdsSorted)
  }
  if (newBrokerIds.nonEmpty || deadBrokerIds.nonEmpty ||
   bouncedBrokerIds.nonEmpty) {
    info(s"Updated broker epochs cache: ${controllerContext.liveBrokerIdAndEpochs}")
  }
}

The code is a bit long, but I have added some important comments. At the same time, I will also draw a diagram to help you understand what this method does.

This method consists of 9 steps in total.

Steps 1-3:

The first two steps are to get the Broker lists from ZooKeeper and ControllerContext respectively. The third step is to get four lists of Brokers: the list of newly added Brokers, the list of Brokers to be removed, the list of bounced Brokers, and the list of currently running Brokers.

Assuming the Broker lists obtained in the first two steps are represented by A and B respectively, since Kafka takes the data in ZooKeeper as authoritative, A is the most up-to-date list of running Brokers. “A-B” represents the newly added Brokers, and “B-A” represents the Brokers to be removed.

The logic for determining the bounced Brokers is a bit more complex. It determines the Brokers whose Epoch values have changed in the A∧B set. In general, you can consider the Epoch value as the version or number of times a Broker has been restarted. If the Epoch value has changed, it means that the Broker has been restarted.

Steps 4-9:

After obtaining these sets, the Controller will perform corresponding operations for these four sets of Brokers, which are the tasks to be done in steps 4-9 of this method. In general, these operations can be categorized into three types.

  • Execute metadata update operations: Call the methods of the ControllerContext class to update different cluster metadata information. For example, add the newly added Brokers to the cluster metadata and remove the Brokers to be removed from the metadata, etc.
  • Execute Broker termination operations: Call the onBrokerFailure method for the Brokers to be removed and the bounced Brokers.
  • Execute Broker startup operations: Call the onBrokerStartup method for the bounced Brokers and the newly added Brokers. Let’s take a closer look at the logic of the onBrokerFailure and onBrokerStartup methods. Compared to other methods, these two methods have more complex code logic and require more actions to be taken, so we will focus on studying them.

First, let’s look at the onBrokerFailure method, which handles the logic of broker termination. The code is as follows:

private def onBrokerFailure(deadBrokers: Seq[Int]): Unit = {
  info(s"Broker failure callback for ${deadBrokers.mkString(",")}")

  // Step 1: Remove related entries from metadata object for each broker to be removed
  deadBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)

  // Step 2: Remove brokers that were shutting down from the list of shutting down brokers
  val deadBrokersThatWereShuttingDown =
    deadBrokers.filter(id => controllerContext.shuttingDownBrokerIds.remove(id))
  if (deadBrokersThatWereShuttingDown.nonEmpty)
    info(s"Removed ${deadBrokersThatWereShuttingDown.mkString(",")} from list of shutting down brokers.")

  // Step 3: Find all replica objects on the dead brokers and perform corresponding operations to mark them as "offline"
  val allReplicasOnDeadBrokers = controllerContext.replicasOnBrokers(deadBrokers.toSet)
  onReplicasBecomeOffline(allReplicasOnDeadBrokers)

  // Step 4: Unregister the registered BrokerModificationsHandler listeners
  unregisterBrokerModificationsHandler(deadBrokers)
}

Broker failure means that we must remove all entries related to it from the Controller metadata cache and handle the replicas stored on these brokers. Finally, we also need to unregister the BrokerModificationsHandler listener that was registered for this broker.

The main logic is in the onReplicasBecomeOffline method, which heavily relies on the functionalities of the Kafka replica manager and partition manager. We will study these two managers separately later, so I won’t go into detail here.

Now, let’s look at the onBrokerStartup method. It is the method used by the Controller to handle the startup of a new broker, which means a new broker joining the cluster. Similarly, I will first provide the complete method code with annotations:

private def onBrokerStartup(newBrokers: Seq[Int]): Unit = {
  info(s"New broker startup callback for ${newBrokers.mkString(",")}")

  // Step 1: Remove replica collections corresponding to the new brokers from the metadata
  newBrokers.foreach(controllerContext.replicasOnOfflineDirs.remove)

  val newBrokersSet = newBrokers.toSet
  val existingBrokers = controllerContext.liveOrShuttingDownBrokerIds.diff(newBrokersSet)

  // Step 2: Send metadata update requests to existing brokers to make them aware of the arrival of the new broker
  sendUpdateMetadataRequest(existingBrokers.toSeq, Set.empty)

  // Step 3: Send metadata update requests to new brokers to synchronize all partition data of the cluster
  sendUpdateMetadataRequest(newBrokers, controllerContext.partitionLeadershipInfo.keySet)

  val allReplicasOnNewBrokers = controllerContext.replicasOnBrokers(newBrokersSet)

  // Step 4: Set all replicas on the new brokers to the Online state, i.e., the available state
  replicaStateMachine.handleStateChanges(
    allReplicasOnNewBrokers.toSeq, OnlineReplica)

  partitionStateMachine.triggerOnlinePartitionStateChange()

  // Step 5: Resume previously paused replica reassignment operations
  maybeResumeReassignments { (_, assignment) =>
    assignment.targetReplicas.exists(newBrokersSet.contains)
  }

  val replicasForTopicsToBeDeleted = allReplicasOnNewBrokers.filter(p => topicDeletionManager.isTopicQueuedUpForDeletion(p.topic))

  // Step 6: Resume previously paused topic deletion operations
  if (replicasForTopicsToBeDeleted.nonEmpty) {
    info(s"Some replicas ${replicasForTopicsToBeDeleted.mkString(",")} for topics scheduled for deletion " +
      s"${controllerContext.topicsToBeDeleted.mkString(",")} are on the newly restarted brokers " +
      s"${newBrokers.mkString(",")}. Signaling restart of topic deletion for these topics")
    topicDeletionManager.resumeDeletionForTopics(
      replicasForTopicsToBeDeleted.map(_.topic))
  }

  // Step 7: Register BrokerModificationsHandler listeners for the new brokers
}

Here is how the onBrokerStartup method works:

  • Step 1: Remove the replica collections corresponding to the new brokers from the metadata.
  • Step 2: Send metadata update requests to existing brokers to make them aware of the arrival of the new broker.
  • Step 3: Send metadata update requests to new brokers to synchronize all partition data of the cluster.
  • Step 4: Set all replicas on the new brokers to the Online state, i.e., the available state.
  • Step 5: Resume previously paused replica reassignment operations that involve the new brokers.
  • Step 6: Resume previously paused topic deletion operations that involve replicas on the newly restarted brokers.
  • Step 7: Register BrokerModificationsHandler listeners for the new brokers. registerBrokerModificationsHandler(newBrokers) }

As shown in the code, the first step is to remove the information of the newly added Broker from the metadata cache. You may ask, “Aren’t these Brokers all new? Is their data stored in the metadata cache?” In fact, the newBrokers here only represent newly started Brokers, and they are not necessarily completely new Brokers. Therefore, it is very safe to delete the metadata cache here.

Steps 2 and 3: Send update metadata requests to the existing Brokers and the newly added Brokers in the cluster, respectively. This way, each Broker in the cluster can be aware of each other, and eventually all Brokers will have the same partition data.

Step 4: Set the replica state on the newly added Brokers to Online. The Online state means that these replicas are providing normal services, with the Leader replica providing read-write services externally and the Follower replica automatically synchronizing messages with the Leader replica.

Steps 5 and 6: Restart the replica migration and topic deletion operations that can be re-executed due to the start of the new Broker.

Step 7: Register the BrokerModificationsHandler listener for all newly added Brokers, allowing the Controller to monitor the data changes of their nodes in ZooKeeper.

Member Information Management #

After understanding how the Controller manages the number of cluster members, let’s focus on how the Controller listens for changes in information on the Broker side and the specific operations.

Similar to managing cluster members, the Controller also responds to Broker changes through ZooKeeper listeners. This listener is the BrokerModificationsHandler. Once the Broker’s information changes, the handleDataChange method of this listener will be called to write the BrokerModifications event to the event queue.

The KafkaController class’s processBrokerModification method is responsible for handling such events, as shown in the following code:

private def processBrokerModification(brokerId: Int): Unit = {
  if (!isActive) return
  // Step 1: Get detailed data of the target Broker, including hostname, port number, and the security protocol used by each listener configuration.
  val newMetadataOpt = zkClient.getBroker(brokerId)
  // Step 2: Get the detailed data of the target Broker from the metadata cache.
  val oldMetadataOpt = controllerContext.liveOrShuttingDownBroker(brokerId)
  if (newMetadataOpt.nonEmpty && oldMetadataOpt.nonEmpty) {
    val oldMetadata = oldMetadataOpt.get
    val newMetadata = newMetadataOpt.get
    // Step 3: If the two are not equal, it means that the Broker data has changed.
    // In that case, update the metadata cache and execute the onBrokerUpdate method to handle the Broker update logic.
    if (newMetadata.endPoints != oldMetadata.endPoints) {
      info(s"Updated broker metadata: $oldMetadata -> $newMetadata")
      controllerContext.updateBrokerMetadata(oldMetadata, newMetadata)
      onBrokerUpdate(brokerId)
    }
  }
}

This method first obtains the most authoritative Broker data on ZooKeeper and compares it with the data in the metadata cache. If they are found to be inconsistent, the metadata cache is updated and the onBrokerUpdate method is called to perform the update logic.

So, how is the onBrokerUpdate method implemented? Let’s take a look at the code first:

private def onBrokerUpdate(updatedBrokerId: Int): Unit = {
  info(s"Broker info update callback for $updatedBrokerId")
  // Send UpdateMetadataRequest to all Brokers in the cluster to update their metadata.
  sendUpdateMetadataRequest(
    controllerContext.liveOrShuttingDownBrokerIds.toSeq, Set.empty)
}

As you can see, onBrokerUpdate simply sends an update metadata request to all Brokers in the cluster to broadcast the change information.

So, the method to handle Broker information changes is relatively simple, right?

Topic Management #

In addition to maintaining cluster members, the Controller also has an important task of managing all topics, including creating, modifying, and deleting topics.

After mastering the methods for managing cluster members in the previous section, learning the following content will be much easier. This is because their implementation mechanisms are similar and there are almost no differences.

Topic Creation/Modification #

Let’s focus on how topics are created. In fact, topic modification is similar to topic creation, so the source code uses a set of listeners to handle these two cases.

You’ve probably used the kafka-topics script or AdminClient in Kafka to create topics, right? Actually, these tools simply write the corresponding data to the directory in ZooKeeper. So how does the Controller, or the Kafka cluster, become aware of the newly created topics?

This is thanks to the ZooKeeper listener that listens to the topic path: TopicChangeHandler. Here is the code:

class TopicChangeHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  // ZooKeeper node: /brokers/topics
  override val path: String = TopicsZNode.path
  // Writes TopicChange event to the event queue
  override def handleChildChange(): Unit = eventManager.put(TopicChange)
}

In the code, TopicsZNode.path refers to the /brokers/topics node in ZooKeeper. Once there is new topic information under this node, the handleChildChange method of this listener will be triggered, and the Controller will write a TopicChange event to the event queue through the ControllerEventManager object.

After receiving this event in the process method of the KafkaController, it calls the processTopicChange method to perform topic creation. Here is the code:

private def processTopicChange(): Unit = {
  if (!isActive) return
  // Step 1: Get all topics from ZooKeeper
  val topics = zkClient.getAllTopicsInCluster(true)
  // Step 2: Compare with the metadata cache to find the list of newly added topics and deleted topics
  val newTopics = topics -- controllerContext.allTopics
  val deletedTopics = controllerContext.allTopics.diff(topics)
  // Step 3: Update the metadata cache with the topic list from ZooKeeper
  controllerContext.setAllTopics(topics)
  // Step 4: Register partition modification handlers for the newly added topics
  // Partition modification handlers listen for partition changes in topics
  registerPartitionModificationsHandlers(newTopics.toSeq)
  // Step 5: Get replica assignments for the newly added topics from ZooKeeper
  val addedPartitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(newTopics)
  // Step 6: Remove cache items for deleted topics from the metadata cache
  deletedTopics.foreach(controllerContext.removeTopic)
  // Step 7: Update the replica assignment entries in the metadata cache for the newly added topics
  addedPartitionReplicaAssignment.foreach {
    case (topicAndPartition, newReplicaAssignment) => controllerContext.updatePartitionFullReplicaAssignment(topicAndPartition, newReplicaAssignment)
  }
  info(s"New topics: [$newTopics], deleted topics: [$deletedTopics], new partition replica assignment " +
    s"[$addedPartitionReplicaAssignment]")
  // Step 8: Set the state of all partitions and replicas for the newly added topics to "Online"
  if (addedPartitionReplicaAssignment.nonEmpty)
    onNewPartitionCreation(addedPartitionReplicaAssignment.keySet)
}

Although there are a total of 8 steps, most of the logic is related to updating the metadata cache, so the overall processing logic is relatively simple. It is worth noting that in step 8, the partition manager and replica manager are used to adjust the state of partitions and replicas. We will discuss this in detail later. For now, you only need to know that when a partition and its replicas are in the “Online” state, it means they are functioning properly.

Topic Deletion #

Similar to topic creation or modification, deleting topics also relies on ZooKeeper listeners.

The Controller defines TopicDeletionHandler to implement the listening for topic deletion. Here is the code:


class TopicDeletionHandler(eventManager: ControllerEventManager) extends ZNodeChildChangeHandler {
  // ZooKeeper node: /admin/delete_topics
  override val path: String = DeleteTopicsZNode.path
  // Write TopicDeletion event to the event queue
  override def handleChildChange(): Unit = eventManager.put(TopicDeletion)
}

The DeleteTopicsZNode.path here refers to the node /admin/delete_topics. Currently, both the kafka-topics script and the AdminClient create a child node with the name of the topic to be deleted under /admin/delete_topics when a topic is deleted.

For example, if I want to delete the topic test-topic, Kafka’s deletion command simply creates the node /admin/delete_topics/test-topic on ZooKeeper. Once this node is created and monitored, the handleChildChange method of TopicDeletionHandler is triggered, and the controller writes a TopicDeletion event to the event queue.

The method that handles the TopicDeletion event is processTopicDeletion, and the code is as follows:

private def processTopicDeletion(): Unit = {
  if (!isActive) return
  // Get the list of topics to be deleted from ZooKeeper
  var topicsToBeDeleted = zkClient.getTopicDeletions.toSet
  debug(s"Delete topics listener fired for topics ${topicsToBeDeleted.mkString(",")} to be deleted")
  // Find the list of non-existent topics
  val nonExistentTopics = topicsToBeDeleted -- controllerContext.allTopics
  if (nonExistentTopics.nonEmpty) {
    warn(s"Ignoring request to delete non-existing topics ${nonExistentTopics.mkString(",")}")
    zkClient.deleteTopicDeletions(nonExistentTopics.toSeq, controllerContext.epochZkVersion)
  }
  topicsToBeDeleted --= nonExistentTopics
  // If delete.topic.enable parameter is set to true
  if (config.deleteTopicEnable) {
    if (topicsToBeDeleted.nonEmpty) {
      info(s"Starting topic deletion for topics ${topicsToBeDeleted.mkString(",")}")
      topicsToBeDeleted.foreach { topic =>
        val partitionReassignmentInProgress = controllerContext.partitionsBeingReassigned.map(_.topic).contains(topic)
        if (partitionReassignmentInProgress)
          topicDeletionManager.markTopicIneligibleForDeletion(
            Set(topic), reason = "topic reassignment in progress")
      }
      // Insert the topics to be deleted into the deletion wait list for processing by TopicDeletionManager
      topicDeletionManager.enqueueTopicsForDeletion(topicsToBeDeleted)
    }
  } else { // Delete topic is not allowed
    info(s"Removing $topicsToBeDeleted since delete topic is disabled")
    // Clear the child nodes under /admin/delete_topics in ZooKeeper
    zkClient.deleteTopicDeletions(topicsToBeDeleted.toSeq, controllerContext.epochZkVersion)
  }
}

To help you understand more clearly, I will provide a diagram:

First, the code retrieves the list of child nodes under /admin/delete_topics from ZooKeeper, which corresponds to the list of topics to be deleted.

Then, it compares it with the list of topics in the metadata cache to determine the list of non-existent topics. If there are non-existent topics, the corresponding child nodes under /admin/delete_topics are deleted. At the same time, the code updates the list of topics to be deleted by removing these non-existent topics.

Next, the code checks the value of the broker-side parameter delete.topic.enable. If this parameter is set to false, meaning topic deletion is not allowed, the code simply clears the child nodes under /admin/delete_topics in ZooKeeper and no further operations are performed. Otherwise, the code iterates through the list of topics to be deleted and temporarily marks the topics that are currently undergoing partition reassignment as “ineligible for deletion”.

Finally, the remaining topics that can be deleted are passed to the TopicDeletionManager to perform the actual deletion logic.

The TopicDeletionManager is a Kafka manager specifically responsible for deleting topics. I will explain its code implementation in detail in the next lesson.

Summary #

Today, we learned about the two main functions of the Controller: managing the cluster’s Broker members and topics. These two functions are important services provided by the Controller. I suggest you carefully review the source code of these two parts to understand how the Controller manages the important resources in the cluster.

Based on these contents, I have summarized a few key points that I hope will help you better understand and remember them.

  • Cluster member management: The Controller is responsible for effectively managing all members of the cluster, including automatically discovering new Brokers, automatically handling offline Brokers, and promptly responding to changes in Broker data.
  • Topic management: The Controller is responsible for efficiently managing all topics on the cluster, including creating topics, modifying topics, and deleting topics, among others. For deleting topics, the actual deletion operation is completed by the underlying TopicDeletionManager.

Next, we will move on to the next module: the state machine module. In this module, we will learn about the three major state machines or managers provided by Kafka. The Controller relies heavily on these state machines to manage all Kafka objects under its jurisdiction. In the next module, I will take you deep into the underlying state transitions of partitions or replicas. You definitely don’t want to miss it.

Post-class Discussion #

If we want to use a script command to add a partition to a topic, do you know which method in the KafkaController class should be used?

Feel free to express your thoughts and discuss with me in the comments area. You are also welcome to share today’s content with your friends.