16 Topic Deletion Manager How Topics Are Deleted

16 TopicDeletionManager How Topics are Deleted #

Hello, I’m Hu Xi. Today, we officially enter into the study of the fourth major module, “State Machines”.

In the Kafka source code, there are many state machines and managers, such as the Controller Channel Manager we learned before, the Controller Event Manager that handles controller events, and so on. These managers and state machines are mostly closely related to their respective “host” components and can be said to vary in size and function. For example, these two managers of the Controller must be tightly coupled with the Controller component in order to achieve their respective functions.

However, there are still some state machines and managers in Kafka that have relatively independent functional frameworks and do not heavily rely on the users. These include the Topic Deletion Manager, Replica State Machine, and Partition State Machine that I have selected for you in this module.

  • Topic Deletion Manager: Responsible for performing deletion operations on specified Kafka topics and clearing all kinds of “traces” of the topics to be deleted in the cluster.
  • Replica State Machine: Responsible for defining Kafka replica states, legal state transitions, and managing transitions between states.
  • Partition State Machine: Responsible for defining Kafka partition states, legal state transitions, and managing transitions between states.

Whether it is a topic, a partition, or a replica, they usually have multiple states in their lifecycle in Kafka. And these three state machines are used to manage these states. How to achieve correct and efficient management is the core problem that the source code needs to solve.

Today, let’s start by learning about the Topic Deletion Manager and see how Kafka deletes a topic.

Pre-reading #

When I first started learning Kafka, I had a very shallow understanding of deleting Kafka topics. Previously, I thought that running the kafka-topics.sh --delete command would delete the topic. I believe many people may have had this misunderstanding.

One consequence of this incorrect understanding is that we often find that the topics are not completely deleted. As a result, there is a set of “ultimate martial arts secrets” circulating online: manually deleting log files on the disk and manually deleting various nodes related to the topic under ZooKeeper.

Personally, I always recommend against using this set of “secrets” for two reasons:

  • It is not complete. In fact, unless you restart the broker, this set of “secrets” cannot clean up the relevant entries for the topics to be deleted in the metadata cache on the controller side and each broker.
  • It is not officially certified, which means the consequences are your own responsibility. To some extent, it will bring about bad results that you cannot control.

As they say, “it’s better to have no abilities than to encounter challenges”. Instead of pondering how to save yourself after failing to delete a topic, it is better to study how Kafka performs this operation at the underlying level. Understanding its principles and then using the “secrets” in a targeted manner will enable you to achieve your goals. Don’t you agree?

Overview of TopicDeletionManager #

Alright, let’s officially start learning about TopicDeletionManager.

This manager is located in the kafka.controller package, with the file name TopicDeletionManager.scala. In this source code of less than 400 lines, it defines three class structures and over twenty methods. Overall, it is relatively easy to learn.

To give you a preliminary understanding, I have created a UML diagram of the TopicDeletionManager.scala code:

This source file, TopicDeletionManager.scala, consists of three parts.

  • DeletionClient interface: Responsible for implementing topic deletion and subsequent actions, such as updating metadata. This interface defines four methods: deleteTopic, deleteTopicDeletions, mutePartitionModifications, and sendMetadataUpdate. We will study their code in more detail later.
  • ControllerDeletionClient class: Implements the DeletionClient interface and implements the four methods mentioned above.
  • TopicDeletionManager class: The manager class for topic deletion, which defines several methods to maintain the correctness of the cluster state before and after a topic deletion. For example, when a topic can be deleted, when a topic cannot be deleted, what operations to avoid during the topic deletion process, and so on.

DeletionClient interface and its implementation #

Next, let’s discuss these 3 parts one by one. First is the DeletionClient interface and its implementation class.

As mentioned earlier, the methods defined in the DeletionClient interface are used to delete topics and synchronize the deletion with other Brokers.

Currently, there is only one implementation class for the DeletionClient interface, which is ControllerDeletionClient. Let’s take a look at the code for this implementation class:

class ControllerDeletionClient(controller: KafkaController, zkClient: KafkaZkClient) extends DeletionClient {
  // Delete the given topic
  override def deleteTopic(topic: String, epochZkVersion: Int): Unit = {
    // Delete the /brokers/topics/<topic> node
    zkClient.deleteTopicZNode(topic, epochZkVersion)
    // Delete the /config/topics/<topic> node
    zkClient.deleteTopicConfigs(Seq(topic), epochZkVersion)
    // Delete the /admin/delete_topics/<topic> node
    zkClient.deleteTopicDeletions(Seq(topic), epochZkVersion)
  }
  // Delete the given topic child node under /admin/delete_topics
  override def deleteTopicDeletions(topics: Seq[String], epochZkVersion: Int): Unit = {
    zkClient.deleteTopicDeletions(topics, epochZkVersion)
  }
  // Unregister the listener for data changes on /brokers/topics/<topic>
  override def mutePartitionModifications(topic: String): Unit = {
    controller.unregisterPartitionModificationsHandlers(Seq(topic))
  }
  // Send metadata update request for the specified partitions to the cluster Brokers
  override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
    controller.sendUpdateMetadataRequest(
      controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
  }
}

The constructor of this class takes two arguments. And since it is an implementation class of the DeletionClient interface, it implements the four methods defined in the DeletionClient interface.

First, let’s talk about the two fields in the constructor: KafkaController instance and KafkaZkClient instance. We are already familiar with the KafkaController instance, which is the Controller component object. The KafkaZkClient instance is the client object for Kafka’s interaction with ZooKeeper.

Next, let’s look at the code along with the implementation of the four methods defined in the DeletionClient interface by the ControllerDeletionClient implementation class. I will briefly explain what these four methods are roughly doing.

1. deleteTopic

It is used to delete all traces of a topic in ZooKeeper. The specific method is to call three methods of KafkaZkClient to delete the /brokers/topics/ node, /config/topics/ node, and /admin/delete_topics/ node respectively.

2. deleteTopicDeletions

It is used to delete the marker nodes of the topics to be deleted under ZooKeeper. The specific method is to call the KafkaZkClient’s deleteTopicDeletions method to delete a group of topics’ child nodes under /admin/delete_topics. Note that the method name deleteTopicDeletions in the end indicates the child nodes under /admin/delete_topics. So, deleteTopic is used to delete the topic, while deleteTopicDeletions is used to delete the corresponding child nodes under /admin/delete_topics.

At this point, we should also note that both of these methods have a field called epochZkVersion, which represents the expected Controller Epoch version number. If you execute these methods with an old Epoch version number, ZooKeeper will reject it because it doesn’t match the version number it has saved. If a Controller’s Epoch value is smaller than the one saved in ZooKeeper, then this Controller is likely an expired Controller. Such a Controller is called a Zombie Controller. The purpose of the epochZkVersion field is to isolate the operations sent by Zombie Controllers.

3. mutePartitionModifications

Its purpose is to mute the listener for changes in topic partition data. The specific implementation principle is to cancel the listener for data changes on the /brokers/topics/ node. So when the partition data of the topic changes, because the corresponding ZooKeeper listener has been canceled, the respective processing logic in the Controller will not be triggered.

Why cancel this listener? Actually, the main reason is to avoid mutual interference between operations. Imagine that User A initiates a topic deletion, while User B adds partitions to this topic at the same time. In this case, these two operations will conflict with each other. If the Controller is allowed to process both operations simultaneously, it will cause logical confusion and inconsistent state. To deal with this situation, before removing the replica and partition objects of a topic, the code needs to execute this method first to ensure that it no longer responds to other operations on that topic initiated by users.

The implementation principle of the mutePartitionModifications method is very simple. It calls the unregisterPartitionModificationsHandlers and then calls the unregisterZNodeChangeHandler method of KafkaZkClient to cancel the listener for data changes on the partition nodes of the given topic on ZooKeeper.

4. sendMetadataUpdate

It calls the sendUpdateMetadataRequest method of KafkaController to send an update metadata request to all Brokers in the cluster, telling them to stop serving partitions of the deleted topics. The code is as follows:

override def sendMetadataUpdate(partitions: Set[TopicPartition]): Unit = {
  // Send an UpdateMetadataRequest to all Brokers in the cluster
  // to notify them of the state changes for the given partitions
  controller.sendUpdateMetadataRequest(
    controller.controllerContext.liveOrShuttingDownBrokerIds.toSeq, partitions)
}

This method sends an update metadata request to all Brokers in the cluster to inform them of the state changes for the given partitions.

Definition and Initialization of TopicDeletionManager #

With this background, let’s take a look at the main entry point of the topic deletion manager: the TopicDeletionManager class. The definition code for this class is as follows:

class TopicDeletionManager(
  // KafkaConfig class, saves broker-side parameters
  config: KafkaConfig,
  // Cluster metadata
  controllerContext: ControllerContext,
  // Replica state machine, used to set replica state
  replicaStateMachine: ReplicaStateMachine,
  // Partition state machine, used to set partition state
  partitionStateMachine: PartitionStateMachine,
  // DeletionClient interface, implements topic deletion
  client: DeletionClient) extends Logging {
  this.logIdent = s"[Topic Deletion Manager ${config.brokerId}] "
  // Whether topic deletion is enabled
  val isDeleteTopicEnabled: Boolean = config.deleteTopicEnable
  ......
}

This class has six main attributes, let’s take a look at each of them:

  • config: An instance of the KafkaConfig class, which can be used to get the value of the delete.topic.enable parameter on the broker side. This parameter controls whether topic deletion is allowed, and its default value is true, which means Kafka allows users to delete topics.
  • controllerContext: Metadata information stored on the controller side. Deleting a topic requires updating cluster metadata, so the TopicDeletionManager needs to use methods from controllerContext to update the data it stores.
  • replicaStateMachine and partitionStateMachine: The replica state machine and the partition state machine. They are responsible for the state transitions of replicas and partitions, respectively, in order to maintain the consistent state of replica objects and partition objects in the cluster. These two state machines are important knowledge points in the subsequent lessons.
  • client: The DeletionClient interface mentioned earlier. The TopicDeletionManager uses this interface to perform the corresponding updates of nodes on ZooKeeper.
  • isDeleteTopicEnabled: Indicates whether a topic is allowed to be deleted. It is the value of the delete.topic.enable broker-side parameter, and its default value is true, indicating that Kafka allows topic deletion. This field is heavily used in the source code to determine the deletability of topics. The main purpose of the previous config parameter is to set the value of this field. After being set, config is no longer used in the source code.

Now that we know the meanings of these fields, let’s take a look at how an instance of the TopicDeletionManager class is created.

In fact, this instance is created during the initialization of the KafkaController class. In the source code of the KafkaController class, you can easily find this line of code:

val topicDeletionManager = new TopicDeletionManager(config, controllerContext, replicaStateMachine,
  partitionStateMachine, new ControllerDeletionClient(this, zkClient))

As you can see, the code instantiates a completely new ControllerDeletionClient object and then uses this object instance, along with replicaStateMachine and partitionStateMachine, to create an instance of TopicDeletionManager.

To help you understand, I’ll draw a flowchart for you:

Important Methods in TopicDeletionManager #

In addition to class definition and initialization, the TopicDeletionManager class also defines 16 methods. Among these methods, the most important is the resumeDeletions method. It is the method that restarts the topic deletion process.

Sometimes, a topic cannot be deleted immediately due to certain events, such as replica reassignment in progress for the topic partition. Once these events are completed, the topic becomes eligible for deletion again. At this point, the code needs to call the resumeDeletions method to restart the deletion operation.

This method is important because it also links many other methods in the TopicDeletionManager class, such as completeDeleteTopic and onTopicDeletion. Therefore, you can start learning about the code from the resumeDeletions method and gradually dive into other methods.

Now let’s learn about the implementation code of resumeDeletions.

private def resumeDeletions(): Unit = {
  // Get the list of topics to be deleted from the metadata cache
  val topicsQueuedForDeletion = Set.empty[String] ++ controllerContext.topicsToBeDeleted
  // List of topics eligible for retry
  val topicsEligibleForRetry = mutable.Set.empty[String]
  // List of topics eligible for deletion
  val topicsEligibleForDeletion = mutable.Set.empty[String]
  
  if (topicsQueuedForDeletion.nonEmpty)
    info(s"Handling deletion for topics ${topicsQueuedForDeletion.mkString(",")}")
  
  // Iterate over each topic queued for deletion
  topicsQueuedForDeletion.foreach { topic =>
    // If all replicas of the topic are in ReplicaDeletionSuccessful state
    // i.e., the topic has been deleted
    if (controllerContext.areAllReplicasInState(topic, ReplicaDeletionSuccessful)) {
      // Call completeDeleteTopic method to complete the subsequent operations
      completeDeleteTopic(topic)
      info(s"Deletion of topic $topic successfully completed")
    // If deletion of the topic has not started yet and the topic is currently ineligible for deletion
    } else if (!controllerContext.isAnyReplicaInState(topic, ReplicaDeletionStarted)) {
      if (controllerContext.isAnyReplicaInState(topic, ReplicaDeletionIneligible)) {
        // Add the topic to the list of topics eligible for retry for subsequent retries
        topicsEligibleForRetry += topic
      }
    }
    
    // If the topic is eligible for deletion
    if (isTopicEligibleForDeletion(topic)) {
      info(s"Deletion of topic $topic (re)started")
      topicsEligibleForDeletion += topic
    }
  }
  
  // Retry deletion operation for topics in the list of topics eligible for retry
  if (topicsEligibleForRetry.nonEmpty) {
    retryDeletionForIneligibleReplicas(topicsEligibleForRetry)
  }
  
  // Call onTopicDeletion method to perform deletion operation for topics in the list of topics eligible for deletion
  if (topicsEligibleForDeletion.nonEmpty) {
    onTopicDeletion(topicsEligibleForDeletion)
  }
}

From the code, we can see that this method first fetches the list of topics to be deleted from the metadata cache. Then it defines two empty topic lists to store the topics eligible for retry and deletion.

Next, the code iterates over each topic queued for deletion and checks the status of all replicas of the topic. If all replicas are in the ReplicaDeletionSuccessful state, it means that the topic has been successfully deleted. In this case, the completeDeleteTopic method is called to complete the subsequent operations. The code also adds topics that have not started deletion yet and are currently ineligible for deletion to the list of topics eligible for retry. If a topic is eligible for deletion, it is added to the list of topics eligible for deletion.

Finally, the method calls the retryDeletionForIneligibleReplicas method to retry the deletion operation for topics in the list of topics eligible for retry. It also calls the onTopicDeletion method to perform the deletion operation for topics in the list of topics eligible for deletion.

It’s worth mentioning that the retryDeletionForIneligibleReplicas method is used to retry topic deletion. This is done by changing the status of the corresponding replica of the topic from ReplicaDeletionIneligible to OfflineReplica. This way, when resumeDeletions is called again, it will attempt to delete the topic again.

From this code, we can once again see that the method naming in Kafka is really standardized. Thanks to this, many times we can understand what a method roughly does without diving deep into its internal implementation. For example:

  • The topicsQueuedForDeletion method should be used to store the list of topics to be deleted.
  • The controllerContext.isAnyReplicaInState method should be used to check if any replica of a topic is in a certain state.
  • The onTopicDeletion method should be used to perform the deletion operation for topics. At this point, if you read the source code of these 3 methods again, you will find that their functionality indeed matches their names. This once again proves that the Kafka source code is of very good quality. Therefore, whether you are a Kafka user or not, you can consider Kafka’s source code as a choice for reading open-source framework source code and enhancing your competitive edge.

Next, let me explain the execution flow of the resumeDeletions method with the following diagram:

With this, I have finished explaining the logic of the resumeDeletions method, which indeed connects many methods defined in TopicDeletionManger. Among them, two important operations are completeDeleteTopic and onTopicDeletion. Let’s look at them one by one.

Let’s start with the code of the completeDeleteTopic method, and I have added annotations to each line of code.

private def completeDeleteTopic(topic: String): Unit = {
  // Step 1: Unregister the partition change listener to prevent
  // the listener from being triggered during the deletion process,
  // which could cause inconsistency in the state
  client.mutePartitionModifications(topic)
  // Step 2: Get all replica objects in ReplicaDeletionSuccessful state
  // for this topic, i.e., all replicas that have been successfully deleted
  val replicasForDeletedTopic = controllerContext.replicasInState(topic, ReplicaDeletionSuccessful)
  // Step 3: Transform these replica objects into NonExistentReplica state
  // using the replica state machine. This is equivalent to deleting
  // these replicas in the state machine
  replicaStateMachine.handleStateChanges(
    replicasForDeletedTopic.toSeq, NonExistentReplica)
  // Step 4: Update the metadata cache by removing the deleted topic
  // from the list of topics to be deleted and topics with deletion started
  // since the topic has been successfully deleted, it does not need to
  // appear in these two lists anymore
  controllerContext.topicsToBeDeleted -= topic
  controllerContext.topicsWithDeletionStarted -= topic
  // Step 5: Remove all "traces" of the topic from ZooKeeper
  client.deleteTopic(topic, controllerContext.epochZkVersion)
  // Step 6: Remove all "traces" of the topic from the metadata cache
  controllerContext.removeTopic(topic)
}

The whole process is straightforward and easy to understand, so I won’t explain it further.

Now let’s look at the code of the onTopicDeletion method:

private def onTopicDeletion(topics: Set[String]): Unit = {
  // Find out the topics in the given set that have not yet started
  // the deletion operation
  val unseenTopicsForDeletion = topics.diff(controllerContext.topicsWithDeletionStarted)
  if (unseenTopicsForDeletion.nonEmpty) {
    // Get all partition objects for these topics
    val unseenPartitionsForDeletion = unseenTopicsForDeletion.flatMap(controllerContext.partitionsForTopic)
    // Change the state of these partitions to OfflinePartition and NonExistentPartition
    // using the partition state machine. This is equivalent to deleting
    // these partitions from the state machine
    partitionStateMachine.handleStateChanges(
      unseenPartitionsForDeletion.toSeq, OfflinePartition)
    partitionStateMachine.handleStateChanges(
      unseenPartitionsForDeletion.toSeq, NonExistentPartition)
    // Add these topics to the list of topics with deletion started
    controllerContext.beginTopicDeletion(unseenTopicsForDeletion)
  }
  // Send metadata update requests to all brokers in the cluster,
  // telling them not to process data for these topics anymore
  client.sendMetadataUpdate(
    topics.flatMap(controllerContext.partitionsForTopic))
  // The partition deletion operation will execute the physical
  // disk file deletion action
  onPartitionDeletion(topics)
}

I have explained the logic of the onTopicDeletion method in the code comments. You can see that this method has a linear flow and is easy to understand. Let me summarize the key points:

The onTopicDeletion method uses the partition state machine multiple times to adjust the partition state of the topics to be deleted. In the upcoming lessons on partition state machine and replica state machine, I will explain them in detail, including the defined states and the rules for state transitions, etc.

The last line of the onTopicDeletion method calls the onPartitionDeletion method to perform the actual physical disk file deletion. In fact, this is done through state transitions in the replica state machine. We will discuss this in detail in the next lesson.

When learning the TopicDeletionManager methods, it is important to understand the context of topic deletion. The same principle applies to other parts of the source code. Once you have grasped the overall flow, reading the code of those detailed methods will not be difficult. Following this method, it is not difficult to understand the Kafka source code!

Summary #

Today, we mainly studied the code related to topic deletion in TopicDeletionManager.scala. There are a few key points that you need to remember.

  • During the topic deletion process, Kafka adjusts data in three places in the cluster: ZooKeeper, metadata cache, and disk log files. When deleting a topic, all ZNode nodes related to that topic in ZooKeeper must be cleared; relevant entries in the controller-side metadata cache must also be processed and synchronized to other brokers in the cluster; and clearing the disk log files is the top priority. These three places must be handled together, just like what we often call an atomic operation. Now, recalling the “cheatsheet” mentioned at the beginning, you will find that it lacks a very important component, which is the inability to clear the controller-side metadata cache entries. Therefore, you should do your best to avoid using this “trump card”.
  • The role of the DeletionClient interface is mainly to operate ZooKeeper and perform operations such as deleting ZooKeeper nodes.
  • The TopicDeletionManager is initialized during the KafkaController creation process and mainly interacts with the metadata cache to update various data.

In the code we saw today, there were a lot of replicaStateMachine and partitionStateMachine, which are actually the replica state machine and partition state machine that I have been mentioning repeatedly today. In the next two lectures, I will gradually take you into the world of their code, allowing you to appreciate Kafka’s elegance in managing replicas and partitions through state machine mechanisms.

Class Discussion #

In the last class, while studying the code of the processTopicDeletion method, we came across a method called markTopicIneligibleForDeletion, which is also related to topic deletion. Now, can you tell me what it is used for and how it is implemented?

Feel free to express your thoughts and discuss with me in the comments section. You are also welcome to share today’s content with your friends.