24 Replica Manager How the Replica Manager Reads and Writes Copies

24 ReplicaManager How the Replica Manager Reads and Writes Copies #

Hello, I’m Hu Xi. In the previous lesson, we learned about the definition and important fields of the ReplicaManager class. Today, we will continue studying the source code of the part that deals with reading and writing replica objects in this class. Whether it’s reading replicas or writing replicas, it is done through the underlying Partition objects, all of which are stored in the allPartitions field we learned in the previous lesson. Understanding the purpose of these fields is an important prerequisite for exploring the functionality of the ReplicaManager class.

Now, let’s learn about the replica read and write functionality. The entire synchronization mechanism of Kafka essentially consists of replica reading and replica writing. Once you understand these two functionalities, you will know how Follower replicas synchronize data from Leader replicas.

AppendRecords: Writing to Replicas #

The concept of appendRecords refers to writing messages to the underlying log of replicas. In the ReplicaManager class, there is a method called appendRecords that implements writing to replicas.

There are four scenarios in which appendRecords is needed in the Kafka source code world.

  • Scenario 1: The producer writes messages to the leader replica.
  • Scenario 2: The follower replica pulls messages and writes them to the replica.
  • Scenario 3: The consumer group writes group information.
  • Scenario 4: The transaction manager writes transaction information, including transaction markers and transaction metadata.

Except for the second scenario, which directly calls methods of the Partition object, the other three scenarios all call appendRecords to complete the writing process.

This method writes a set of partition messages to the corresponding leader replica and selectively waits for other replicas to finish writing, depending on the “acks” setting in the PRODUCE request. Then, it calls the specified callback logic.

Let’s first take a look at its method signature:

def appendRecords(
  timeout: Long,  // Request processing timeout
  requiredAcks: Short,  // Request "acks" setting
  internalTopicsAllowed: Boolean,  // Whether to allow writing to internal topics
  origin: AppendOrigin,  // The origin of the write request
  entriesPerPartition: Map[TopicPartition, MemoryRecords], // Messages to be written
  responseCallback: Map[TopicPartition, PartitionResponse] => Unit,
  delayedProduceLock: Option[Lock] = None,
  recordConversionStatsCallback: 
    Map[TopicPartition, RecordConversionStats] => Unit = _ => ())
  : Unit = {
  ......
}

There are many input parameters, and they are all important. I’ll go through them one by one.

  • timeout: The timeout for request processing. For the producer, it is the value of the request.timeout.ms parameter.
  • requiredAcks: Whether to wait for other replicas to finish writing. For the producer, it is the value of the acks parameter. In other scenarios, Kafka defaults to -1, which means waiting for all other replicas to finish writing before returning.
  • internalTopicsAllowed: Whether to allow writing messages to internal topics. For regular producers, this field is set to False, indicating that writing to internal topics is not allowed. For coordinator components, especially the GroupCoordinator component that manages consumer groups, one of its responsibilities is to write messages to the internal offset topic, so the value of this field is True in this case.
  • origin: AppendOrigin is an interface that represents the origin of the write request. Currently, it defines three types of origins: Replication, Coordinator, and Client. Replication indicates that the write request is sent by a follower replica, which needs to write the messages obtained from the leader replica to the underlying message log. Coordinator indicates that the writes are initiated by a coordinator, which can be the GroupCoordinator that manages consumer groups or the TransactionCoordinator that manages transactions. Client indicates that the write is initiated by a client. As mentioned earlier, the appendRecords method is not called during the follower replica synchronization process, so the value of origin can only be Replication or Coordinator.
  • entriesPerPartition: Messages grouped by partition to be written.
  • responseCallback: The callback logic function to be called after a successful write.
  • delayedProduceLock: A lock object used to ensure thread safety for consumer group operations. Not used in other scenarios.
  • recordConversionStatsCallback: The callback statistical logic for message format conversion operations, mainly used to track some data metrics during the message format conversion process, such as the total number of converted messages and the time spent.

Next, let’s see how appendRecords writes messages to the replica log using these input parameters. I’ll paste the complete code here and provide comments for important steps:

// The value of `requiredAcks` can only be -1, 0, or 1; otherwise, it is considered illegal.
if (isValidRequiredAcks(requiredAcks)) {
  val sTime = time.milliseconds
  // Call the appendToLocalLog method to write the set of messages to the local log.
  val localProduceResults = appendToLocalLog(
    internalTopicsAllowed = internalTopicsAllowed,
    origin, entriesPerPartition, requiredAcks)
  debug("Produce to local log in %d ms".format(time.milliseconds - sTime))
  val produceStatus = localProduceResults.map { case (topicPartition, result) =>
    topicPartition ->
      ProducePartitionStatus(
        result.info.lastOffset + 1, // Set the offset value for the next message to be written
        // Build a PartitionResponse object to encapsulate the write result
        new PartitionResponse(result.error, result.info.firstOffset.getOrElse(-1), result.info.logAppendTime,
          result.info.logStartOffset, result.info.recordErrors.asJava, result.info.errorMessage))
  }
  // Try updating the statistics of message format conversion.
  recordConversionStatsCallback(localProduceResults.map { case (k, v) => k -> v.info.recordConversionStats })
  // Wait for other replicas to finish writing.
  if (delayedProduceRequestRequired(
    requiredAcks, entriesPerPartition, localProduceResults)) {
    val produceMetadata = ProduceMetadata(requiredAcks, produceStatus)
    // Create a DelayedProduce object for delayed requests.
    val delayedProduce = new DelayedProduce(timeout, produceMetadata, this, responseCallback, delayedProduceLock)
    val producerRequestKeys = entriesPerPartition.keys.map(TopicPartitionOperationKey(_)).toSeq
    // Try to complete the delayed request again.
    // If it cannot be completed temporarily, put the object in the corresponding Purgatory for later processing.
    delayedProducePurgatory.tryCompleteElseWatch(delayedProduce, producerRequestKeys)
  } else { // No need to wait for other replicas to finish writing; we can immediately send the response.
    val produceResponseStatus = produceStatus.map { case (k, status) => k -> status.responseStatus }
    // Call the callback logic and then return.
    responseCallback(produceResponseStatus)
  }
}
} else { // If the value of requiredAcks is invalid
  val responseStatus = entriesPerPartition.map { case (topicPartition, _) =>
    topicPartition -> new PartitionResponse(Errors.INVALID_REQUIRED_ACKS,
      LogAppendInfo.UnknownLogAppendInfo.firstOffset.getOrElse(-1), RecordBatch.NO_TIMESTAMP, LogAppendInfo.UnknownLogAppendInfo.logStartOffset)
  }
  // Construct the INVALID_REQUIRED_ACKS exception and wrap it in the callback function
  responseCallback(responseStatus)
}

To help you better understand, let me explain the complete process of the appendRecords method using another diagram.

Let me further explain the execution flow:

First, it checks if the value of requiredAcks is within a valid range, that is, “whether it is one of -1, 0, or 1”. If the value is not valid, the code enters the else branch, constructs an exception named INVALID_REQUIRED_ACKS, wraps it into the callback function, and then returns the result. Otherwise, the code enters the if branch.

Inside the if branch, the code calls the appendToLocalLog method to save the messages to the local log of the replica. Then it constructs a PartitionResponse object to encapsulate the write results and some important metadata, such as whether the write has any errors (errorMessage), the offset of the next message to be written, and the offset of the first message in the write batch. After that, the code tries to update the metrics for message format conversion. At this point, the source code needs to call the delayedProduceRequestRequired method to determine if the write was successful.

If other replicas need to sync the message write, the code cannot immediately return. It creates a DelayedProduce request object and hands it over to the Purgatory for management. DelayedProduce is a delayed request for the producer, and Purgatory is the delayed produce purgatory referenced in the ReplicaManager constructor. The management by Purgatory mainly involves calling the tryCompleteElseWatch method to attempt completion of the delayed produce request. If it cannot be completed temporarily, the object is placed in the corresponding Purgatory and waits for further processing.

If other replicas do not need to sync the message write, the appendRecords method constructs a response and calls the callback function. At this point, the method finishes.

From the analysis above, we know that the appendToLocalLog method is responsible for writing the messages to the local log, and the delayedProduceRequestRequired method is used to determine whether it is necessary to wait for the other replicas to complete the write. Now let’s take a closer look at the code for these two methods.

First, let’s take a look at appendToLocalLog. From its name, it is used to write to the local log of the replica. Let’s take a look at the main code snippet of this method.

private def appendToLocalLog(
  internalTopicsAllowed: Boolean,
  origin: AppendOrigin,
  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  requiredAcks: Short): Map[TopicPartition, LogAppendResult] = {
  ......
  entriesPerPartition.map { case (topicPartition, records) =>
    brokerTopicStats.topicStats(topicPartition.topic)
      .totalProduceRequestRate.mark()
    brokerTopicStats.allTopicsStats.totalProduceRequestRate.mark()
    // If the topic to be written is an internal topic and internalTopicsAllowed is false, return an error
    if (Topic.isInternal(topicPartition.topic) 
      && !internalTopicsAllowed) {
      (topicPartition, LogAppendResult(
        LogAppendInfo.UnknownLogAppendInfo,
        Some(new InvalidTopicException(s"Cannot append to internal topic ${topicPartition.topic}"))))
    } else {
      try {
        // Get the partition object
        val partition = getPartitionOrException(topicPartition, expectLeader = true)
        // Write the batch of messages to the partition object
        val info = partition.appendRecordsToLeader(records, origin, requiredAcks)
        ......
        // Return the write result
        (topicPartition, LogAppendResult(info))
      } catch {
        ......
      }
    }
  }
}

I skipped many lines of logging and error handling code. As you can see, this method mainly uses the appendRecordsToLeader method of the Partition class to write the batch of messages. The appendRecordsToLeader method uses the appendAsLeader method we learned in Lesson 3 to write to the local log. In general, the logic of appendToLocalLog is not complicated, and you should be able to understand it easily.

Now let’s look at the source code for the delayedProduceRequestRequired method. It is used to determine whether it is necessary to wait for the other replicas to complete the write after the message batch has been written to the log. Here is the code:

private def delayedProduceRequestRequired(
  requiredAcks: Short,
  entriesPerPartition: Map[TopicPartition, MemoryRecords],
  localProduceResults: Map[TopicPartition, LogAppendResult]): Boolean = {
  requiredAcks == -1 && entriesPerPartition.nonEmpty && 
    localProduceResults.values.count(_.exception.isDefined) < entriesPerPartition.size
}

This method returns a Boolean value. True indicates that waiting for the other replicas to complete is required, and False indicates that it is not necessary. The code above shows that if waiting for other replicas’ writes is required, three conditions must be met:

  1. requiredAcks must be equal to -1.
  2. There are still data entries that have not been written.
  3. At least one partition has successfully written its messages to the local log.

Actually, you can see conditions 2 and 3 as a combined condition. If none of the partitions’ writes are successful, it may indicate a severe error, and in that case, it would be wise to not wait and instead return an error to the sender. On the other hand, if some partitions have successfully written while others have failed, it may indicate a sporadic transient error. In this case, putting the current write request into the Purgatory and giving it another chance for retry might be a good idea.

Replica Read: fetchMessages #

Alright, after discussing replica writing, let’s move on to the source code study of replica reading.

In the ReplicaManager class, the method responsible for reading replica data is fetchMessages. Whether it is the Java consumer API or Follower replica, their main way of fetching messages is to send FETCH requests to the Broker. When the Broker receives this request, it calls the fetchMessages method to fetch messages from the underlying Leader replica.

Similar to the appendRecords method, the fetchMessages method may also delay processing FETCH requests because the Broker needs to accumulate enough data before returning a response to the requester.

Let’s take a look at the flowchart below, which shows the main logic of the fetchMessages method.

Let’s examine the signature of this method:

def fetchMessages(timeout: Long,
                      replicaId: Int,
                      fetchMinBytes: Int,
                      fetchMaxBytes: Int,
                      hardMaxBytesLimit: Boolean,
                      fetchInfos: Seq[(TopicPartition, PartitionData)],
                      quota: ReplicaQuota,
                      responseCallback: Seq[(TopicPartition, FetchPartitionData)] => Unit,
                      isolationLevel: IsolationLevel,
                      clientMetadata: Option[ClientMetadata]): Unit = {
      ......
    }

These input parameters are the basis for understanding the important methods below, so let’s analyze them one by one.

  • timeout: The request processing timeout. For consumers, this value is equal to the request.timeout.ms parameter value; for Follower replicas, this value is the value of the replica.fetch.wait.max.ms Broker-side parameter.
  • replicaId: The replica ID. For consumers, this parameter value is -1; for Follower replicas, this value is the ID of the Broker where the Follower replica is located.
  • fetchMinBytes & fetchMaxBytes: The minimum and maximum number of bytes that can be fetched. For consumers, they correspond to the fetch.min.bytes and fetch.max.bytes Consumer-side parameters, respectively; for Follower replicas, they correspond to the replica.fetch.min.bytes and replica.fetch.max.bytes Broker-side parameters, respectively.
  • hardMaxBytesLimit: Whether to impose a hard limit on exceeding the maximum number of bytes. If hardMaxBytesLimit=True, it means that the number of bytes of data returned by the read request must never exceed the maximum number of bytes.
  • fetchInfos: Specifies the information about the partitions to be read, such as which partitions to read from, at which offset to start reading, and how many bytes can be read, etc.
  • quota: This is a quota control class, mainly used to determine whether to perform rate limiting control during the read process.
  • responseCallback: The Response callback logic function. When the request is processed, this method is called to perform the final logic.

With this groundwork, we can proceed to study the code of the method. To facilitate learning, I divided the code of the entire method into two parts: the first part reads from the local log, and the second part determines the Response based on the read result.

Let’s first look at the source code of the first part:

// Determine whether the read request comes from a Follower replica or Consumer
val isFromFollower = Request.isValidBrokerId(replicaId)
val isFromConsumer = !(isFromFollower || replicaId == Request.FutureLocalReplicaId)
// Determine the acceptable read range based on the requester
// If the request is from a regular consumer, it can read up to the high watermark value
// If the request is from a consumer with READ_COMMITTED isolation level, it can read up to the Log Stable Offset value
// If the request is from a Follower replica, it can read up to the LEO value
val fetchIsolation = if (!isFromConsumer)
  FetchLogEnd
else if (isolationLevel == IsolationLevel.READ_COMMITTED)
  FetchTxnCommitted
else
  FetchHighWatermark
val fetchOnlyFromLeader = isFromFollower || (isFromConsumer && clientMetadata.isEmpty)
// Define the readFromLog method to read messages from the underlying log
def readFromLog(): Seq[(TopicPartition, LogReadResult)] = {
  val result = readFromLocalLog(
    replicaId = replicaId,
    fetchOnlyFromLeader = fetchOnlyFromLeader,
    fetchIsolation = fetchIsolation,
    fetchMaxBytes = fetchMaxBytes,
    hardMaxBytesLimit = hardMaxBytesLimit,
    readPartitionInfo = fetchInfos,
    quota = quota,
    clientMetadata = clientMetadata)
  if (isFromFollower) updateFollowerFetchState(replicaId, result)
  else result
}
// Read the message and return the result of log reading
val logReadResults = readFromLog()

In this section of the code, it first checks whether the requester is a Follower replica or a regular Consumer. The determination is based on whether the replicaId field is greater than 0. A Consumer’s replicaId is -1, while a Follower replica’s is a positive number. Once the requester is determined, the code can determine the range that can be read.

The fetchIsolation here refers to the read isolation level. For a Follower replica, it can read all messages below the Leader replica’s LEO value; for a regular Consumer, it can only “see” the messages below the Leader replica’s high watermark value.

After determining the range that can be read, the fetchMessages method will call its internal method readFromLog to read the message data from the local log and assign the result to the logReadResults variable. The main implementation of the readFromLog method is to call the read method of the log object on the partition to perform the actual message reading.

The second part of the fetchMessages method creates the corresponding Response based on the previous read result. Let’s take a look at the specific implementation:

var bytesReadable: Long = 0
var errorReadingData = false
val logReadResultMap = new mutable.HashMap[TopicPartition, LogReadResult]
// Calculate the total number of readable bytes
logReadResults.foreach { case (topicPartition, logReadResult) =>
 brokerTopicStats.topicStats(topicPartition.topic).totalFetchRequestRate.mark()
  brokerTopicStats.allTopicsStats.totalFetchRequestRate.mark()
  if (logReadResult.error != Errors.NONE)
    errorReadingData = true
  bytesReadable = bytesReadable + logReadResult.info.records.sizeInBytes
  logReadResultMap.put(topicPartition, logReadResult)
}
// Determine whether to immediately return the Response. If any of the following four conditions is met:
// 1. The request does not have a timeout, indicating that the requester wants the request to be processed and returned immediately
// 2. No data is obtained
// 3. Enough data has been accumulated
// 4. An error occurred during the reading process
if (timeout <= 0 || fetchInfos.isEmpty || bytesReadable >= fetchMinBytes || errorReadingData) {
  // Build the response
  val fetchPartitionData = logReadResults.map { case (tp, result) =>
    tp -> FetchPartitionData(result.error, result.highWatermark, result.leaderLogStartOffset, result.info.records,
      result.lastStableOffset, result.info.abortedTransactions, result.preferredReadReplica, isFromFollower && isAddingReplica(tp, replicaId))
  }
  // Call the callback function
  responseCallback(fetchPartitionData)
} else { // If the request cannot be completed immediately
  val fetchPartitionStatus = new mutable.ArrayBuffer[(TopicPartition, FetchPartitionStatus)]
  fetchInfos.foreach { case (topicPartition, partitionData) =>
    logReadResultMap.get(topicPartition).foreach(logReadResult => {
      val logOffsetMetadata = logReadResult.info.fetchOffsetMetadata
      fetchPartitionStatus += (topicPartition -> FetchPartitionStatus(logOffsetMetadata, partitionData))
    })
  }
  val fetchMetadata: SFetchMetadata = SFetchMetadata(fetchMinBytes, fetchMaxBytes, hardMaxBytesLimit,
    fetchOnlyFromLeader, fetchIsolation, isFromFollower, replicaId, fetchPartitionStatus)
  // Build the DelayedFetch object
  val delayedFetch = new DelayedFetch(timeout, fetchMetadata, this, quota, clientMetadata,
    responseCallback)
  val delayedFetchKeys = fetchPartitionStatus.map { case (tp, _) => TopicPartitionOperationKey(tp) }
  // Try to complete the request once again. If it is still unable to complete, hand it over to the Purgatory for further processing
  delayedFetchPurgatory.tryCompleteElseWatch(delayedFetch, delayedFetchKeys)
}

In this part of the code, it first calculates the total number of readable bytes based on the previous read results. Then, it determines whether it can immediately return the Response at this point. So, how is it determined whether it can immediately return a Response? In fact, as long as any of the following four conditions is met:

  1. The request does not have a timeout, indicating that the requester wants the request to be processed and returned immediately.   2. No data is obtained.   3. Enough data has been accumulated.   4. An error occurred during the reading process.

If none of these four conditions is met, delayed processing is required. Specifically, the code builds a DelayedFetch object and hands it over to delayedFetchPurgatory for subsequent automatic processing.

At this point, we have completed the write and read operations of the replica manager. Essentially, these two methods respectively call the append and read methods of the log object at the lower level to perform local log read and write operations. After the read and write operations are completed, these two methods also define conditions for delayed processing. Once it is determined that the conditions for delayed processing are met, it is handed over to the corresponding Purgatory for processing.

So far, we have covered the write and read operations of the replica manager and log object in the source code. By understanding the independent functionalities of these individual components through a top-down approach, we can gradually integrate them together and outline the complete call path for operating on partition replica log objects in Kafka. By using both of these approaches to read the source code, we can understand the principles of the Kafka source code more quickly and in more depth.

Summary #

Today, we learned about how the Kafka replica state machine class ReplicaManager handles reading and writing replicas, with a focus on its two important methods: appendRecords and fetchMessages. Let’s review briefly:

  • appendRecords: The method used to write messages to replicas, mainly utilizing the append method of the log and the Purgatory mechanism to achieve data synchronization between follower replicas and leader replicas after obtaining messages.
  • fetchMessages: The method used to read messages from replicas, used by regular consumers and follower replicas. When they send FETCH requests to the broker, the replica manager on the broker calls this method to retrieve the specified messages from the local log.

- In the next lesson, we will shift our focus to the replica manager’s management of replica and partition objects. This is another core functionality of the replica manager, besides reading and writing replicas. You won’t want to miss it!

Post-discussion #

In the parameter list of appendRecords, there is a parameter called origin. I would like you to think about the role of this parameter in the process of writing to local logs. Can you identify the specific source code location where the origin parameter is ultimately used?

Feel free to share your thoughts and answers in the comments section to exchange ideas with me. You are also welcome to share today’s content with your friends.