10 Kafka Apis the Most Important Source Code Entry Point of Kafka

10 KafkaApis The Most Important Source Code Entry Point of Kafka #

Hello, I’m Hu Xi. Today, we will wrap up our study of the Kafka request processing module. Speaking of which, there is still one last topic we haven’t covered yet, and that is the KafkaApis class.

In the previous lesson, I mentioned that the actual processing logic for requests is encapsulated in the KafkaApis class. You must be curious about what this class actually does.

In fact, I have always believed that KafkaApis is the most important entry point for Kafka source code. This is because whenever we want to find the implementation code for a particular feature in Kafka, we almost always start from the KafkaApis.scala file, and then drill down layer by layer until we locate the code that implements the feature. For example, if you want to know the process for creating a topic, you just need to look at the handleCreateTopicsRequest method in KafkaApis; if you want to understand how consumer offset commits are implemented, you can check the handleOffsetCommitRequest method.

In addition, during this process of drilling down again and again, we will slowly gain an understanding of the code paths and source code distribution that implement various features in Kafka, thus building a complete understanding of the entire Kafka source code project.

If all this is still not enough to attract you to read this part of the source code, let me share a real case with you.

Previously, when using Kafka, I found that once the Producer program sends a message to a topic that doesn’t exist, after the topic is created, the Producer side will throw a warning:

Error while fetching metadata with correlation id 3 : {test-topic=LEADER_NOT_AVAILABLE} (org.apache.kafka.clients.NetworkClient)

I was always curious about where this LEADER_NOT_AVAILABLE exception was thrown. Until one day, while browsing the KafkaApis code, I suddenly discovered these two lines of code in the createTopics method:

private def createTopic(topic: String,
  numPartitions: Int, replicationFactor: Int,
  properties: util.Properties = new util.Properties()): MetadataResponseTopic = {
  try {
    adminZkClient.createTopic(topic, numPartitions, replicationFactor, properties, RackAwareMode.Safe)
    ......
    // Explicitly wrap an LEADER_NOT_AVAILABLE Response
    metadataResponseTopic(Errors.LEADER_NOT_AVAILABLE, topic, isInternal(topic), util.Collections.emptyList())
  } catch {
    ......
  }
}

At that moment, everything became clear to me. It turns out that after the broker creates the topic, it explicitly notifies the clients of the LEADER_NOT_AVAILABLE exception. When the clients receive this exception, they will proactively update the metadata to obtain information about the newly created topic. You see, without personally examining the source code, we would not be able to explain this phenomenon.

So, since KafkaApis is so important, now let’s take a look at this famous entry file. I will first introduce its definition and the most important handle method, and then explain the other important methods. After finishing this lesson, you will be able to master the method of starting from the KafkaApis class to find the exact code location for a single feature.

In fact, compared to previous approaches where I shared knowledge with you, this lesson shares a method for learning knowledge.

Definition of KafkaApis class #

Now, let’s first take a look at the definition of the KafkaApis class. The KafkaApis class is defined in the source file KafkaApis.scala. This file is located in the server package of the core project and is a giant file of nearly 3000 lines. Fortunately, its implementation is not complex. The majority of the code is used to handle all types of Kafka requests, so the overall code structure appears to be very organized. You will surely appreciate this when we study the handle method later.

The definition code of the KafkaApis class is as follows:

class KafkaApis(
    val requestChannel: RequestChannel, // request channel
    val replicaManager: ReplicaManager, // replica manager
    val adminManager: AdminManager,     // manager for topics, partitions, configurations, etc.
    val groupCoordinator: GroupCoordinator, // consumer group coordinator component
    val txnCoordinator: TransactionCoordinator, // transaction coordinator component
    val controller: KafkaController,    // controller component
    val zkClient: KafkaZkClient,        // ZooKeeper client program, Kafka relies on this class to interact with ZooKeeper
    val brokerId: Int,                   // broker.id parameter value
    val config: KafkaConfig,            // Kafka configuration class
    val metadataCache: MetadataCache,   // metadata cache class
    val metrics: Metrics,            
    val authorizer: Option[Authorizer],
    val quotas: QuotaManagers,          // quota manager component
    val fetchManager: FetchManager,
    brokerTopicStats: BrokerTopicStats,
    val clusterId: String,
    time: Time,
    val tokenManager: DelegationTokenManager) extends Logging {
  type FetchResponseStats = Map[TopicPartition, RecordConversionStats]
  this.logIdent = "[KafkaApi-%d] ".format(brokerId)
  val adminZkClient = new AdminZkClient(zkClient)
  private val alterAclsPurgatory = new DelayedFuturePurgatory(purgatoryName = "AlterAcls", brokerId = config.brokerId)
  ......
}

I have added comments for some important fields. To help you understand better, I have also created a mind map that lists some of the important components:

From this mind map, it can be seen that KafkaApis is a collection of heavyweight components. Looking at the entire source code project, KafkaApis is associated with the most “big-shot” components! In KafkaApis, you can almost find all the heavyweight components of Kafka, such as ReplicaManager responsible for replica management, GroupCoordinator responsible for maintaining consumer groups, and KafkaController for operating the Controller component, and so on.

When processing different types of RPC requests, KafkaApis uses different components. Therefore, when creating an instance of KafkaApis, we must pass all the possible components that may be used to it. This is also the reason why it gathers multiple heavyweight components in one place.

I mentioned that KafkaApis is another entry class for this reason. You can simply open the KafkaApis.scala file and study the implementation of these heavyweight components one by one based on its definition. Once you have a clear understanding of the code for these components, you might be the one to write the next source code lesson.

KafkaApis Entry Point #

So, as the entry class of Kafka source code, what methods are defined in it?

If you open the code of the KafkaApis class, you will find that it encapsulates many methods starting with handle. Each of these methods corresponds to a type of request, and their common entry method is handle. In fact, you can continuously jump between handle methods to go to the actual code where any type of request is being processed. The following code is the complete implementation of the handle method, let’s take a look:

def handle(request: RequestChannel.Request): Unit = {
  try {
    trace(s"Handling request:${request.requestDesc(true)} from connection ${request.context.connectionId};" +
 s"securityProtocol:${request.context.securityProtocol},principal:${request.context.principal}")
    // Determine the type of request based on the apiKey field in the request header
    // Then call the corresponding handle*** method
    // If there is a new RPC protocol type, then:
    // 1. Add a new apiKey to identify the new request type
    // 2. Add a new case branch
    // 3. Add the corresponding handle*** method
    request.header.apiKey match {
      case ApiKeys.PRODUCE => handleProduceRequest(request)
      case ApiKeys.FETCH => handleFetchRequest(request)
      case ApiKeys.LIST_OFFSETS => handleListOffsetRequest(request)
      case ApiKeys.METADATA => handleTopicMetadataRequest(request)
      case ApiKeys.LEADER_AND_ISR => handleLeaderAndIsrRequest(request)
      case ApiKeys.STOP_REPLICA => handleStopReplicaRequest(request)
      case ApiKeys.UPDATE_METADATA => handleUpdateMetadataRequest(request)
      case ApiKeys.CONTROLLED_SHUTDOWN => handleControlledShutdownRequest(request)
      case ApiKeys.OFFSET_COMMIT => handleOffsetCommitRequest(request)
      case ApiKeys.OFFSET_FETCH => handleOffsetFetchRequest(request)
      // more case branches...
    }
  } catch {
    // If it is a fatal error, throw an exception
    case e: FatalExitError => throw e
    // If it is a normal exception, log the error
    case e: Throwable => handleError(request, e)
  } finally {
    // Record the completion time of the request locally, i.e., the time it takes for the Broker to process the request
    if (request.apiLocalCompleteTimeNanos < 0)
      request.apiLocalCompleteTimeNanos = time.nanoseconds
  }
}

If you have been following this course, you may have noticed that I rarely post the complete code of a class or method, because it is not necessary and it would waste your time. However, this handle method is a bit special, so I’m showing you the complete code.

It uses the pattern matching syntax in Scala to list the processing logic for all types of requests. Through this method, you can trace the source code path of Kafka handling any request. I strongly recommend that you target a few important request types for study, starting from the handle method, and explore how the code serves these requests, in order to deepen your overall proficiency in the code on the Broker side. This will be very helpful for your further study of the source code or solving practical problems.

From the code above, you should easily identify the pattern: this method is used to handle specific requests. Each method that handles a specific type of request starts with handle followed by the name of the request type, such as handleProduceRequest for handling PRODUCE request, handleFetchRequest for handling FETCH request, and so on.

If you click on ApiKeys, you will find that it is actually an enumeration type that encapsulates all the RPC protocols defined by Kafka. It is worth mentioning that the Kafka community maintains an official documentation specifically to record these RPC protocols, including the request and response formats required for different versions.

From this handle method, we can also conclude that when the community adds a new RPC protocol, the Broker side roughly needs to do three things:

  1. Update the ApiKeys enumeration by adding the new RPC ApiKey.
  2. Add the corresponding handle***Request method in KafkaApis to implement the processing logic for that RPC request.
  3. Update the handle method in KafkaApis by adding a case branch for the new RPC protocol.

Other Important Methods #

Apart from the definition of KafkaApis and the handle method, there are several other important methods, such as a set of methods used for sending responses and methods used for authentication. Particularly, the former is a necessary step that needs to be taken after any type of request is processed. After all, it is not enough for the request to be processed, Kafka also needs to send the processing results back to the request sender.

Firstly, let’s talk about the sendResponse series of methods.

Why do I call them a series? Well, there are as many as 7 methods in the source code that contain the phrase “sendResponse”. Let me introduce them one by one.

  • sendResponse (RequestChannel.Response): The underlying method for sending responses. Essentially, it calls the sendResponse method of the RequestChannel component in the SocketServer. As I mentioned in the previous lesson, the sendResponse method of RequestChannel adds the response object to the response queue of the corresponding Processor thread, which is then responsible for transmitting the data between the network.
  • sendResponse (RequestChannel.Request, responseOpt: Option[AbstractResponse], onComplete: Option[Send => Unit]): This method takes a Request object as input, not a Response. So, in the method, it constructs a Response object first before calling the sendResponse method.
  • sendNoOpResponseExemptThrottle: Sends a NoOpResponse type of response without being subject to request channel throttling. A NoOpResponse means that when the Processor thread retrieves a response of this type, it does not perform actual I/O sending operations.
  • sendErrorResponseExemptThrottle: Sends a response with an error message without being subject to throttling.
  • sendResponseExemptThrottle: Sends a normal response without being subject to throttling.
  • sendErrorResponseMaybeThrottle: Sends a response with an error message and can be subject to throttling constraints.
  • sendResponseMaybeThrottle: Sends a normal response and can be subject to throttling constraints.

The most important method in this set is the first sendResponse method. It is used to send the response for most types of requests after they have been processed. As for the other methods in this set, they all internally call the first sendResponse method. Of course, before calling it, these methods usually have some customized logic. For example, the sendResponseMaybeThrottle method will attempt to throttle the request channel before executing the sendResponse logic. Therefore, we need to focus on how the first sendResponse method sends the Response object.

As I mentioned earlier, KafkaApis actually puts the processed Response back into the response queue of the front-end Processor thread, and it is the Processor thread, not the KafkaRequestHandler thread that executes the KafkaApis logic, that returns the Response to the clients or other brokers.

Another very important method is the authorize method. Let’s take a look at its code:

private[server] def authorize(requestContext: RequestContext,
  operation: AclOperation,
  resourceType: ResourceType,
  resourceName: String,
  logIfAllowed: Boolean = true,
  logIfDenied: Boolean = true,
  refCount: Int = 1): Boolean = {
  authorizer.forall { authZ =>
    val resource = new ResourcePattern(resourceType, resourceName, PatternType.LITERAL)
    val actions = Collections.singletonList(new Action(operation, resource, refCount, logIfAllowed, logIfDenied))
    authZ.authorize(requestContext, actions).asScala.head == AuthorizationResult.ALLOWED
  }
}

This method is used for authorization validation. Currently, all RPC requests in Kafka require the sender (whether it is a client or another broker) to have specific permissions.

Next, let me give an example using the code for creating topics to demonstrate the actual application of the authorize method. Here is a snippet of the handleCreateTopicsRequest method:

val hasClusterAuthorization = authorize(request, CREATE, CLUSTER, CLUSTER_NAME, logIfDenied = false)
val topics = createTopicsRequest.data.topics.asScala.map(_.name)
val authorizedTopics = if (hasClusterAuthorization) topics.toSet else filterAuthorized(request, CREATE, TOPIC, topics.toSeq)
val authorizedForDescribeConfigs = filterAuthorized(request, DESCRIBE_CONFIGS, TOPIC, topics.toSeq, logIfDenied = false)
  .map(name => name -> results.find(name)).toMap

results.asScala.foreach(topic => {
  if (results.findAll(topic.name).size > 1) {
    topic.setErrorCode(Errors.INVALID_REQUEST.code)
    topic.setErrorMessage("Found multiple entries for this topic.")
  } else if (!authorizedTopics.contains(topic.name)) {
    topic.setErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
    topic.setErrorMessage("Authorization failed.")
  }
  if (!authorizedForDescribeConfigs.contains(topic.name))
    topic.setTopicConfigErrorCode(Errors.TOPIC_AUTHORIZATION_FAILED.code)
})
......

This code calls the authorize method to check if the client has the permission to create topics. If not, it explicitly marks it as TOPIC_AUTHORIZATION_FAILED and notifies the client. Currently, all permission controls in Kafka occur in KafkaApis, i.e., before any request is processed, the authorize method is called to perform permission validation, ensuring that the request can be executed further.

Analysis of KafkaApis Request Processing Example #

After understanding the code structure of KafkaApis, I will provide an example of a real code snippet to illustrate the general execution flow of a protocol handling method in this class. This will help you gain a clearer understanding of the request processing logic.

It’s important to note that the request processing logic discussed here is different from the previously mentioned complete request processing flow. Today, we are focusing on the functional level logic of request handling, while the previous complete flow focused on the code path of a request from being sent to the Broker-side until the Broker returns a Response. We could say that the processing flow for all types of requests is the same, but each type of request has different functional implementation logic, and that is what each handle×××Request method in the KafkaApis class is responsible for.

Now, let’s take the handleListGroupsRequest method as an example. As the name suggests, this method handles ListGroupsRequest requests. The Response for this type of request should return information about consumer groups in the cluster. Let’s take a look at its implementation:

def handleListGroupsRequest(request: RequestChannel.Request): Unit = {
    val (error, groups) = groupCoordinator.handleListGroups() // Call handleListGroups method of GroupCoordinator to get all Group information
    // If the Clients have DESCRIBE permission for CLUSTER resources
    if (authorize(request, DESCRIBE, CLUSTER, CLUSTER_NAME))
      // Directly use the obtained Group data to construct a Response and then send it
      sendResponseMaybeThrottle(request, requestThrottleMs =>
        new ListGroupsResponse(new ListGroupsResponseData()
            .setErrorCode(error.code)
            .setGroups(groups.map { group => new ListGroupsResponseData.ListedGroup()
              .setGroupId(group.groupId)
              .setProtocolType(group.protocolType)}.asJava
            )
            .setThrottleTimeMs(requestThrottleMs)
        ))
    else {
      // Find out which Groups the Clients have DESCRIBE permission for GROUP resources, and return information for those Groups
      val filteredGroups = groups.filter(group => authorize(request, DESCRIBE, GROUP, group.groupId))
      sendResponseMaybeThrottle(request, requestThrottleMs =>
        new ListGroupsResponse(new ListGroupsResponseData()
          .setErrorCode(error.code)
          .setGroups(filteredGroups.map { group => new ListGroupsResponseData.ListedGroup()
            .setGroupId(group.groupId)
            .setProtocolType(group.protocolType)}.asJava
          )
          .setThrottleTimeMs(requestThrottleMs)
        ))
    }
  }

I will use a flowchart to illustrate the execution logic of this code:

Overall, the implementation logic of handleListGroupsRequest method is quite simple. After obtaining all consumer group information through the GroupCoordinator component, the code applies authorization checks to these Groups, and finally, based on the authorization result, decides which visible consumer groups to return to the Clients.

Summary #

Alright, let’s summarize the key points of the KafkaApis class. As mentioned earlier, we focused on understanding the definition of the KafkaApis class and its important method, handle. Here are some key takeaways that I hope you have grasped:

  • KafkaApis is the entry point for all the functionalities on the broker side and is associated with numerous Kafka components. It is definitely the first entry point for studying the source code. When faced with a huge source code project, if you don’t know where to start, start with the KafkaApis.scala file.
  • The handle method encapsulates the specific processing logic for all RPC requests. Whenever a new RPC protocol is added to the community, adding the corresponding handleXXXRequest method and case branch is a top priority.
  • The sendResponse series of methods are responsible for sending responses back to the request sender. The logic for sending a response is to place the response object in the response queue of the Processor thread and then let the Processor thread handle the network sending.
  • The authorize method is the main logic for permission verification before request processing. You can refer to the official documentation to understand the current permissions available and then compare them to the specific methods to identify the permissions required for each RPC protocol.

KafkaApis

With this, we have completed the content about the Kafka request processing module. In this module, we started with the RequestChannel and explored the implementation principles of the request queue in Kafka. Then, I spent two lessons focusing on the SocketServer component, including the source code of its sub-components such as the Acceptor thread and Processor thread, as well as the entire process of request handling. Today, we focused on studying the KafkaApis class, which is the top-level entry point for request processing, completing the final piece of the puzzle in request handling. I hope you can review this module multiple times, carefully think about the key implementation points, and thoroughly understand the core mechanism of Kafka’s network communication.

Starting from the next lesson, we will dive into the well-known Controller component and study its source code in depth. I will spend 5 lessons taking you through all aspects of the Controller, so stay tuned.

Post-class Discussion #

Finally, please consider the following question: What permissions should a Consumer have to submit offsets to a Broker? Can you point out which part of the code in the KafkaApis illustrates the required permission?

Feel free to share your thoughts and answers in the comments section and discuss with me. You are also welcome to share today’s content with your friends.