03 Logs a Thorough Understanding of Common Operations of Log Objects

03 Logs A thorough understanding of common operations of Log objects #

Hello, I’m Hu Xi. In the previous lesson, we learned about the process of loading log segments. Today, I’m going to continue to teach you about the Log source code and introduce the common operations of Log objects.

I usually divide the common operations of Log into four major parts:

  1. High Watermark Management Operations: The concept of the high watermark is crucial in Kafka, and managing it is one of the most important functions of Log.
  2. Log Segment Management: Log is a container for log segments. Efficiently organizing and managing all the log segment objects under it is the core problem that the source code needs to solve.
  3. Critical Offset Management: Log defines many important offset values, such as the Log Start Offset and the Log End Offset (LEO). Ensuring the correctness of these offset values is the foundation of building a consistent messaging engine.
  4. Read and Write Operations: The so-called log operations mainly refer to reading and writing logs. The significance of read and write operations is self-evident.

Next, I will introduce the common operations of Log objects to you in the order mentioned above, and I hope you pay special attention to the high watermark management part.

In fact, many improvements to the log code in the community are based on the high watermark mechanism, and some are even made to replace the high watermark mechanism. For example, Kafka’s KIP-101 proposal formally introduces the Leader Epoch mechanism, which is used to replace the high watermark in log truncation operations. Obviously, if you want to deeply understand the Leader Epoch, you need to at least understand the high watermark and clearly know its drawbacks.

Since high watermark management is so important, let’s start with it.

High Watermark Management Operations #

Before introducing high watermark management operations, let’s first understand the definition of high watermark.

Definition #

In the source code, there is only one line that defines the high watermark of the log object:

@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)

This line of code conveys two important facts:

  1. The high watermark value is volatile. Since multiple threads may read it at the same time, it needs to be set as volatile to ensure memory visibility. Additionally, because the high watermark value may be modified by multiple threads simultaneously, the source code uses Java Monitor locks to ensure thread safety for concurrent modifications.
  2. The initial value of the high watermark value is the Log Start Offset value. In the previous lesson, we mentioned that each Log object maintains a Log Start Offset value. When constructing the high watermark for the first time, it is assigned the value of the Log Start Offset.

You may be curious about what the LogOffsetMetadata object is. Since it is quite important, let’s take a look at the definition of this class together:

case class LogOffsetMetadata(messageOffset: Long,
                             segmentBaseOffset: Long = Log.UnknownOffset, relativePositionInSegment: Int = LogOffsetMetadata.UnknownFilePosition)

Clearly, it is a POJO class that contains three important variables:

  1. messageOffset: Message Offset Value, which is the most important information. When we talk about the high watermark value, we are actually referring to the value of this variable.
  2. segmentBaseOffset: The starting offset of the log segment that holds this offset value. The starting offset of the log segment is used to calculate the difference in bytes between two messages in the physical disk file, i.e., how many bytes apart are two messages from each other. There is a prerequisite for this calculation: the two messages must be in the same log segment and cannot span across log segments. Otherwise, they will be in different physical files, and calculating this value would be meaningless. The segmentBaseOffset here is used to determine if two messages are in the same log segment.
  3. relativePositionSegment: The physical disk position of the log segment that holds this offset value. This field is very useful when calculating the difference in physical disk positions between two offset values. You can think about when Kafka needs to calculate the number of bytes between two positions. The answer is when reading the log. Suppose each read can only read 1MB of data, then the source code needs to consider whether the total number of bytes between two offsets exceeds 1MB.

All the methods in the LogOffsetMetadata class are utility methods related to these three variables, and they are very easy to understand. I will provide a detailed explanation of one method, and you can apply the same logic to the others.

def onSameSegment(that: LogOffsetMetadata): Boolean = {
    if (messageOffsetOnly)
        throw new KafkaException(s"$this cannot compare its segment info with $that since it only has message offset info")

    this.segmentBaseOffset == that.segmentBaseOffset
}

From the name, we can tell that this method is used to determine whether the given two LogOffsetMetadata objects are in the same log segment. The method is easy: it compares the segmentBaseOffset values of the two LogOffsetMetadata objects.

Okay, let’s get back to the high watermark. You should pay close attention to the methods for getting and setting the high watermark value, updating the high watermark value, as well as reading the high watermark value.

Getting and Setting the High Watermark Value #

As for the method for getting the high watermark value, it is straightforward and I won’t elaborate on it. The method for setting the high watermark value, namely the setter method, is a bit more complex. To help you understand it, I will explain its purpose in the form of comments.

// getter method: get the offset value of the high watermark
def highWatermark: Long = highWatermarkMetadata.messageOffset

// setter method: set the high watermark value
private def updateHighWatermarkMetadata(newHighWatermark: LogOffsetMetadata): Unit = {
    if (newHighWatermark.messageOffset < 0) // The high watermark value cannot be negative
        throw new IllegalArgumentException("High watermark offset should be non-negative")

    lock synchronized { // Protect the Monitor lock for Log object modifications
        highWatermarkMetadata = newHighWatermark // Assign the new high watermark value
        producerStateManager.onHighWatermarkUpdated(newHighWatermark.messageOffset) // Process the logic of updating the high watermark value in the transaction state manager, ignore it...
        maybeIncrementFirstUnstableOffset() // The First Unstable Offset is part of the Kafka transaction mechanism, ignore it...
    }
    trace(s"Setting high watermark $newHighWatermark")
}

Updating the high watermark value #

In addition, the source code defines two methods for updating the high watermark value: updateHighWatermark and maybeIncrementHighWatermark. From their names, the former is used to definitely update the high watermark value, while the latter may or may not update it.

Let’s take a look at their implementations.

// updateHighWatermark method
def updateHighWatermark(hw: Long): Long = {
    // The new high watermark value must be between [Log Start Offset, Log End Offset]
    val newHighWatermark = if (hw < logStartOffset)  
      logStartOffset
    else if (hw > logEndOffset)
      logEndOffset
    else
      hw
    // Call the setter method to update the high watermark value
    updateHighWatermarkMetadata(LogOffsetMetadata(newHighWatermark))
    newHighWatermark  // Finally return the new high watermark value
}
// maybeIncrementHighWatermark method
def maybeIncrementHighWatermark(newHighWatermark: LogOffsetMetadata): Option[LogOffsetMetadata] = {
    // The new high watermark value cannot exceed the Log End Offset
    if (newHighWatermark.messageOffset > logEndOffset)
        throw new IllegalArgumentException(s"High watermark $newHighWatermark update exceeds current " +
        s"log end offset $logEndOffsetMetadata")

    lock.synchronized {
        val oldHighWatermark = fetchHighWatermarkMetadata  // Get the old high watermark value

        // The new high watermark value should be greater than the old high watermark value to maintain monotonicity
        // Additionally, if the new high watermark value is on a new log segment, update the high watermark value
        if (oldHighWatermark.messageOffset < newHighWatermark.messageOffset ||
            (oldHighWatermark.messageOffset == newHighWatermark.messageOffset && oldHighWatermark.onOlderSegment(newHighWatermark))) {
            updateHighWatermarkMetadata(newHighWatermark)
            Some(oldHighWatermark) // Return the old high watermark value
        } else {
            None
        }
    }
}

You may wonder why there are two methods for updating the high watermark value.

In fact, these two methods serve different purposes. The updateHighWatermark method is mainly used to update the high watermark value when the follower replica obtains messages from the leader replica. Once new messages are obtained, it is necessary to update the high watermark value. On the other hand, the maybeIncrementHighWatermark method is used to update the high watermark value of the leader replica. It is important to note that the update of the high watermark value for the leader replica is conditional - in some cases, the high watermark value may be updated, while in other cases it may not be.

As I mentioned earlier, the follower replica must update the high watermark value after successfully fetching messages from the leader replica. However, when the producer writes messages to the leader replica, the high watermark value for the partition may not need to be updated - because it may need to wait for the progress of other follower replicas to sync. Therefore, two different methods for updating the high watermark value are defined in the source code, each applicable to different scenarios.

Reading the high watermark value #

The final operation related to high watermark value management is the fetchHighWatermarkMetadata method. It not only retrieves the high watermark value, but also retrieves other metadata information about the high watermark, such as the log segment’s starting offset and physical location. Here is its implementation logic:

private def fetchHighWatermarkMetadata: LogOffsetMetadata = {
    checkIfMemoryMappedBufferClosed() // Make sure the log is not closed when reading

    val offsetMetadata = highWatermarkMetadata // Save the current high watermark value to a local variable to avoid interference from multiple threads
    if (offsetMetadata.messageOffsetOnly) { // If the complete high watermark metadata is not available
        lock.synchronized {
            val fullOffset = convertToOffsetMetadataOrThrow(highWatermark) // Retrieve the complete high watermark metadata information by reading the log file
            updateHighWatermarkMetadata(fullOffset) // Then update the high watermark object
            fullOffset
        }
    } else { // Otherwise, simply return it
        offsetMetadata
    }
}

Log Segment Management #

Earlier, I mentioned that a log is a container for log segments. So how does it actually serve as a container?

private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]

As you can see, the source code uses the ConcurrentSkipListMap class in Java to store all the log segment objects. ConcurrentSkipListMap has two obvious advantages:

  • It is thread-safe, so Kafka source code does not need to ensure thread safety during log segment operations.
  • It is a key-value sorted map. Kafka uses the starting offset value of each log segment as the key. This allows us to easily sort and compare all log segments based on their starting offset values and quickly find the previous and next log segments that are close to a given offset value.

Log segment management mainly involves adding, deleting, modifying, and querying. Let’s look at each of these aspects one by one.

1. Adding

The Log object defines a method for adding log segment objects: addSegment.

def addSegment(segment: LogSegment): LogSegment = this.segments.put(segment.baseOffset, segment)

It’s very simple. It calls the put method of the map to add the given log segment object to segments.

2. Deleting

The delete operation is relatively complex. We know that Kafka has many retention policies, including ones based on time, space, and Log Start Offset. So what is a retention policy? Essentially, it is a rule that determines which log segments can be deleted.

From the source code perspective, the overall entry point for controlling deletion operations in Log is the deleteOldSegments method without any parameters:

def deleteOldSegments(): Int = {
  if (config.delete) {
    deleteRetentionMsBreachedSegments() + deleteRetentionSizeBreachedSegments() + deleteLogStartOffsetBreachedSegments()
  } else {
    deleteLogStartOffsetBreachedSegments()
  }
}

The code deleteRetentionMsBreachedSegments, deleteRetentionSizeBreachedSegments, and deleteLogStartOffsetBreachedSegments respectively correspond to the three strategies mentioned above.

The following image shows Kafka’s current three log retention strategies and all the methods involved in log segment deletion:

From the image, we can see that the three retention strategy methods mentioned earlier all call the version of deleteOldSegments with parameters, and this method subsequently calls the deletableSegments and deleteSegments methods. Let’s now take a closer look at the code for these three methods.

First, the version of deleteOldSegments method with parameters:

private def deleteOldSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean, reason: String): Int = {
  lock synchronized {
    val deletable = deletableSegments(predicate)
    if (deletable.nonEmpty)
      info(s"Found deletable segments with base offsets [${deletable.map(_.baseOffset).mkString(",")}] due to $reason")
    deleteSegments(deletable)
  }
}

This method has only two steps:

  1. It uses the provided function to calculate which log segment objects can be deleted.
  2. It calls the deleteSegments method to delete these log segments.

Next is the deletableSegments method. I will explain the meaning of the main code using comments:

private def deletableSegments(predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment] = {
  if (segments.isEmpty) { // If there are no log segment objects at all, return directly
    // ...
  } else {
    val keysToRemove = segments.headMap(predicate, true).keySet
    if (keysToRemove.isEmpty) {
      // ...
    } else {
      keysToRemove.asScala.flatMap(key => Option(segments.remove(key)))
    }
  }
}

The method has the following steps:

  • If there are no log segment objects, it immediately returns an empty iterable.
  • Otherwise, it obtains the keys of all the log segments that satisfy the given predicate and then removes them from the segments map.

Finally, the deleteSegments method is used to delete the log segments:

private def deleteSegments(segmentsToDelete: Iterable[LogSegment]): Int = {
  segmentsToDelete.foreach(_.delete())
  segmentsToDelete.size
}

The method simply calls the delete method on each log segment object in the iterable to delete them, and then returns the number of deleted segments.

That covers the deletion of log segments. In the next part, we will look at modifying and querying log segments.

if (segments.isEmpty) {
  Seq.empty
} else {
  val deletable = ArrayBuffer.empty[LogSegment]
  var segmentEntry = segments.firstEntry

  // Start traversing from the LogSegment object with the minimum starting offset value until one of the following conditions is met:
  // 1. The condition function 'predicate' returns false
  // 2. The Log object high watermark value is reached
  // 3. The latest LogSegment object doesn't contain any messages
  // The latest LogSegment object is the LogSegment with the largest Key value in the 'segments' map, which is also known as the Active Segment. The Active Segment, which is completely empty, is not allowed to be deleted as it will need to be rebuilt later on.
  // During the traversal, all LogSegment objects that do not meet any of the above 3 conditions can be deleted!

  while (segmentEntry != null) {
    val segment = segmentEntry.getValue
    val nextSegmentEntry = segments.higherEntry(segmentEntry.getKey)
    val (nextSegment, upperBoundOffset, isLastSegmentAndEmpty) =
      if (nextSegmentEntry != null)
        (nextSegmentEntry.getValue, nextSegmentEntry.getValue.baseOffset, false)
      else
        (null, logEndOffset, segment.size == 0)

    if (highWatermark >= upperBoundOffset && predicate(segment, Option(nextSegment)) && !isLastSegmentAndEmpty) {
      deletable += segment
      segmentEntry = nextSegmentEntry
    } else {
      segmentEntry = null
    }
  }
  deletable
}

Finally, the deleteSegments method performs the actual log segment deletion.

private def deleteSegments(deletable: Iterable[LogSegment]): Int = {
  maybeHandleIOException(s"Error while deleting segments for $topicPartition in dir ${dir.getParent}") {
    val numToDelete = deletable.size
    if (numToDelete > 0) {
      // Do not allow deleting all log segment objects. If required, create a new one and remove the first N segments.
      if (segments.size == numToDelete)
        roll()
      lock synchronized {
        checkIfMemoryMappedBufferClosed() // Ensure that the Log object is not closed
        // Remove and delete the given log segment objects including the underlying physical files
        removeAndDeleteSegments(deletable, asyncDelete = true)
        // Attempt to update the Log Start Offset value
        maybeIncrementLogStartOffset(
          segments.firstEntry.getValue.baseOffset)
      }
    }
    numToDelete
  }
}

Here, I will briefly explain why we need to update the Log Start Offset value after deleting log segment objects. The Log Start Offset value is the minimum offset value of the visible messages in the entire Log object. If we delete log segment objects, it is highly likely that the range of visible messages has changed, so we need to update the Log Start Offset value. That’s why the deleteSegments method updates the Log Start Offset value after deleting the log segment objects.

3. Modification

Finally, let’s talk about how to modify log segment objects.

In fact, the source code does not involve modifying log segment objects. The so-called modification or update is simply replacing the old log segment objects with new ones. For example, the statement segments.put(1L, newSegment) is adding a log segment when there is no Key=1, otherwise, it is replacing the existing log segment.

4. Querying

Finally, let’s discuss how to query log segment objects. There are many places in the source code that require querying log segment objects, but the main method used is the available methods from ConcurrentSkipListMap.

  • segments.firstEntry: Get the first log segment object.
  • segments.lastEntry: Get the last log segment object, which is the Active Segment.
  • segments.higherEntry: Get the first log segment object with a starting offset value greater than or equal to the given Key value.
  • segments.floorEntry: Get the last log segment object with a starting offset value less than or equal to the given Key value.

Managing Key Offset Values #

The Log object maintains some key offset values, such as Log Start Offset and LEO (Log End Offset). Actually, the high watermark value is also considered a key offset value, but it is so important that I separate it as an independent part to explain.

Do you remember the diagram I showed you in the last class that illustrates LEO and Log Start Offset? Let me use this diagram to explain the differences between these key offset values:

Please pay attention to the dashed box representing the offset value 15 in the diagram. It reveals an important fact: The LEO in the Log object always points to the next message to be inserted, which means there are no messages above the LEO value! The code for defining LEO in the source code is straightforward:

@volatile private var nextOffsetMetadata: LogOffsetMetadata = _

The nextOffsetMetadata here is what we call LEO, and it is an object of type LogOffsetMetadata. When the Log object initializes, the source code loads all log segment objects and calculates the next message offset value for the current Log. Then, the Log object assigns this offset value to LEO. Here is an excerpt of the code:

locally {
  val startMs = time.milliseconds
  // Create log directory and save Log object disk files
  Files.createDirectories(dir.toPath)
  // Initialize leader epoch cache
  initializeLeaderEpochCache()
  // Load all log segment objects and return the next message offset value for this Log object
  val nextOffset = loadSegments()
  // Initialize LEO metadata object, with the LEO value obtained from the previous step as the offset, the starting offset as the base offset of the active segment, and the segment size as the size of the active segment
  nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)

  // Update the leader epoch cache by removing all invalid cache entries above the LEO value
  leaderEpochCache.foreach(
    _.truncateFromEnd(nextOffsetMetadata.messageOffset))
  ......
}

Of course, the source code separately defines the updateLogEndOffset method to update LEO:

private def updateLogEndOffset(offset: Long): Unit = {
  nextOffsetMetadata = LogOffsetMetadata(offset, activeSegment.baseOffset, activeSegment.size)
  if (highWatermark >= offset) {
    updateHighWatermarkMetadata(nextOffsetMetadata)
  }
  if (this.recoveryPoint > offset) {
    this.recoveryPoint = offset
  }
}

Based on the above source code, you should be able to see that the update process is simple, so I won’t expand on it. However, you need to note that if the new LEO value is smaller than the high watermark value during the update process, Kafka needs to update the high watermark value as well. For the same Log object, the high watermark value cannot exceed the LEO value. Remember this important point!

Speaking of LEO, I will now tell you about Log Start Offset. In terms of operation flow and principles, managing Log Start Offset in the source code is simpler than LEO because Log Start Offset is just a long integer value, not an object. The code defines a dedicated updateLogStartOffset method to update it. This method is simple, so I won’t go into details. You can study its implementation yourself.

Now, let’s think about when Kafka needs to update Log Start Offset. Let’s take a look one by one:

  1. When the Log object initializes: Similar to LEO, Log Start Offset needs to be assigned a value when the Log object initializes. Usually, the starting offset value of the first log segment is assigned to it.
  2. When log truncation occurs: Similarly, if part of the messages in the log is deleted, it may cause a change in Log Start Offset, so it is necessary to update its value.
  3. When syncing with the Follower replica: Once the Log Start Offset value of the leader replica’s Log object changes, the Follower replica also needs to attempt to update this value to maintain consistency with the leader replica.
  4. When log segments are deleted: This is similar to log truncation. Any operation that involves deleting messages may cause a change in Log Start Offset.
  5. When messages are deleted: Strictly speaking, this update timing seems a bit counterintuitive. In Kafka, deleting messages is achieved by increasing the Log Start Offset value. Therefore, when deleting messages, it is necessary to update this value.

You can check the calling timing of the updateLogEndOffset method in the code to verify if it matches what I said. Here, I also want to give you a little hint: When reading the source code, it is better to include some thinking rather than simply accepting the content of the source code. Maybe you will have different insights.

After explaining LEO, let me tell you about Log Start Offset. In terms of operation flow and principles, managing Log Start Offset in the source code is simpler than LEO because Log Start Offset is just a long integer value. The code defines a dedicated updateLogStartOffset method to update it. This method is simple, so I won’t go into details. You can study its implementation yourself.

Now, let’s think about when Kafka needs to update Log Start Offset. Let’s look at them one by one:

  1. When the Log object initializes: Similar to LEO, Log Start Offset needs to be assigned a value when the Log object initializes. Usually, the starting offset value of the first log segment is assigned to it.
  2. When log truncation occurs: Similarly, if part of the messages in the log is deleted, it may cause a change in Log Start Offset, so it is necessary to update its value.
  3. When syncing with the Follower replica: Once the Log Start Offset value of the Leader replica’s Log object changes, the Follower replica also needs to attempt to update this value to maintain consistency with the Leader replica.
  4. When log segments are deleted: This is similar to log truncation. Any operation that involves deleting messages may cause a change in Log Start Offset.
  5. When messages are deleted: Strictly speaking, this update timing seems a bit counterintuitive. In Kafka, deleting messages is achieved by increasing the Log Start Offset value. Therefore, when deleting messages, it is necessary to update this value.

Read and Write Operations #

Finally, I will focus on the read and write operations for the Log object.

1. Write Operations

In the Log class, there are three methods involving write operations: appendAsLeader, appendAsFollower, and append. The call hierarchy is shown in the following diagram:

appendAsLeader is used to write data to the leader replica, appendAsFollower is used to synchronize data with the follower replica, and both methods ultimately call the append method.

Let’s focus on the append method. The execution flow of the append method is shown in the following diagram:

Looking at this diagram, you might think, “Wow, there are 12 steps involved in the execution? That’s a lot!” Don’t worry, I will explain the implementation of each step using code comments.

private def append(records: MemoryRecords,
                   origin: AppendOrigin,
                   interBrokerProtocolVersion: ApiVersion,
                   assignOffsets: Boolean,
                   leaderEpoch: Int): LogAppendInfo = {
    maybeHandleIOException(s"Error while appending records to $topicPartition in dir ${dir.getParent}") {
        // Step 1: Analyze and validate the records to be appended, and return the validation result
        val appendInfo = analyzeAndValidateRecords(records, origin)

        // If there are no valid records to be written, return directly
        if (appendInfo.shallowCount == 0)
            return appendInfo

        // Step 2: Format the records by removing invalid format or bytes
        var validRecords = trimInvalidBytes(records, appendInfo)

        lock synchronized {
            checkIfMemoryMappedBufferClosed() // Make sure the Log object is not closed
            if (assignOffsets) { // Need to assign offsets
                // Step 3: Use the current LEO (Log End Offset) value as the offset for the first record in the records to be written
                val offset = new LongRef(nextOffsetMetadata.messageOffset)
                appendInfo.firstOffset = Some(offset.value)
                val now = time.milliseconds
                val validateAndOffsetAssignResult = try {
                    LogValidator.validateMessagesAndAssignOffsets(validRecords,
                        topicPartition,
                        offset,
                        time,
                        now,
                        appendInfo.sourceCodec,
                        appendInfo.targetCodec,
                        config.compact,
                        config.messageFormatVersion.recordVersion.value,
                        config.messageTimestampType,
                        config.messageTimestampDifferenceMaxMs,
                        leaderEpoch,
                        origin,
                        interBrokerProtocolVersion,
                        brokerTopicStats)
                } catch {
                    case e: IOException =>
                        throw new KafkaException(s"Error validating messages while appending to log $name", e)
                }
                // Update the validation result object, LogAppendInfo
                validRecords = validateAndOffsetAssignResult.validatedRecords
                appendInfo.maxTimestamp = validateAndOffsetAssignResult.maxTimestamp
                appendInfo.offsetOfMaxTimestamp = validateAndOffsetAssignResult.shallowOffsetOfMaxTimestamp
                appendInfo.lastOffset = offset.value - 1
                appendInfo.recordConversionStats = validateAndOffsetAssignResult.recordConversionStats
                if (config.messageTimestampType == TimestampType.LOG_APPEND_TIME)
                    appendInfo.logAppendTime = now

                // Step 4: Validate the messages to ensure their sizes do not exceed the maximum limit
                if (validateAndOffsetAssignResult.messageSizeMaybeChanged) {
                    for (batch <- validRecords.batches.asScala) {
                        if (batch.sizeInBytes > config.maxMessageSize) {
                            // We record the original size of the message set instead of the trimmed size
                            // to be consistent with the pre-compression bytesRejectedRate recording
                            brokerTopicStats.topicStats(topicPartition.topic).bytesRejectedRate.mark(records.sizeInBytes)
                            brokerTopicStats.allTopicsStats.bytesRejectedRate.mark(records.sizeInBytes)
                            throw new RecordTooLargeException(s"Message batch size is ${batch.sizeInBytes} bytes in append to" +
                                s" partition $topicPartition which exceeds the maximum configured size of ${config.maxMessageSize}.")
                        }
                    }
                }
            } else { // Use the given offset directly, no need to assign offsets
                if (!appendInfo.offsetsMonotonic) // Ensure the offsets are monotonic
                    throw new OffsetsOutOfOrderException(s"Out of order offsets found in append to $topicPartition: " +
                                                        records.records.asScala.map(_.offset))

                if (appendInfo.firstOrLastOffsetOfFirstBatch < nextOffsetMetadata.messageOffset) {
                    val firstOffset = appendInfo.firstOffset match {
                        case Some(offset) => offset
                        case None => records.batches.asScala.head.baseOffset()
                    }

                    val firstOrLast = if (appendInfo.firstOffset.isDefined) "First offset" else "Last offset of the first batch"
                    throw new UnexpectedAppendOffsetException(
                        s"$firstOrLast is $firstOffset, but it should be equal to or greater than the next offset metadata ${nextOffsetMetadata.messageOffset} in append to $topicPartition.")
                }
            }
        }
                  s"Unexpected offset in append to $topicPartition. $firstOrLast " +
                  s"${appendInfo.firstOrLastOffsetOfFirstBatch} is less than the next offset ${nextOffsetMetadata.messageOffset}. " +
                  s"First 10 offsets in append: ${records.records.asScala.take(10).map(_.offset)}, last offset in" +
                  s" append: ${appendInfo.lastOffset}. Log start offset = $logStartOffset",
                  firstOffset, appendInfo.lastOffset)
              }
            }
    
            // Step 5: Update Leader Epoch Cache
            validRecords.batches.asScala.foreach { batch =>
              if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
                maybeAssignEpochStartOffset(batch.partitionLeaderEpoch, batch.baseOffset)
              } else {
                leaderEpochCache.filter(_.nonEmpty).foreach { cache =>
                  warn(s"Clearing leader epoch cache after unexpected append with message format v${batch.magic}")
                  cache.clearAndFlush()
                }
              }
            }
    
            // Step 6: Ensure Message Size does not exceed limit
            if (validRecords.sizeInBytes > config.segmentSize) {
              throw new RecordBatchTooLargeException(s"Message batch size is ${validRecords.sizeInBytes} bytes in append " +
                s"to partition $topicPartition, which exceeds the maximum configured segment size of ${config.segmentSize}.")
            }
    
            // Step 7: Perform Log Rolling. The current log segment may not have enough capacity to hold the new message set,
            // so it is necessary to create a new log segment to store all the messages to be written
            val segment = maybeRoll(validRecords.sizeInBytes, appendInfo)
    
            val logOffsetMetadata = LogOffsetMetadata(
              messageOffset = appendInfo.firstOrLastOffsetOfFirstBatch,
              segmentBaseOffset = segment.baseOffset,
              relativePositionInSegment = segment.size)
    
            // Step 8: Validate Transaction State
            val (updatedProducers, completedTxns, maybeDuplicate) = analyzeAndValidateProducerState(
              logOffsetMetadata, validRecords, origin)
    
            maybeDuplicate.foreach { duplicate =>
              appendInfo.firstOffset = Some(duplicate.firstOffset)
              appendInfo.lastOffset = duplicate.lastOffset
              appendInfo.logAppendTime = duplicate.timestamp
              appendInfo.logStartOffset = logStartOffset
              return appendInfo
            }
    
            // Step 9: Perform Actual Message Write, mainly calls the append method of the log segment object
            segment.append(largestOffset = appendInfo.lastOffset,
              largestTimestamp = appendInfo.maxTimestamp,
              shallowOffsetOfMaxTimestamp = appendInfo.offsetOfMaxTimestamp,
              records = validRecords)
    
            // Step 10: Update Log End Offset, where LEO value is the offset value of the last message in the message set + 1
            // As mentioned earlier, the LEO value always points to the next non-existent message
            updateLogEndOffset(appendInfo.lastOffset + 1)
    
            // Step 11: Update Transaction State
            for (producerAppendInfo <- updatedProducers.values) {
              producerStateManager.update(producerAppendInfo)
            }
    
            for (completedTxn <- completedTxns) {
              val lastStableOffset = producerStateManager.lastStableOffset(completedTxn)
              segment.updateTxnIndex(completedTxn, lastStableOffset)
              producerStateManager.completeTxn(completedTxn)
            }
    
            producerStateManager.updateMapEndOffset(appendInfo.lastOffset + 1)
           maybeIncrementFirstUnstableOffset()
    
            trace(s"Appended message set with last offset: ${appendInfo.lastOffset}, " +
              s"first offset: ${appendInfo.firstOffset}, " +
              s"next offset: ${nextOffsetMetadata.messageOffset}, " +
              s"and messages: $validRecords")
    
            // Step 12: Flush manually if needed. In general, we do not need to set a Broker-side parameter log.flush.interval.messages
            // as the flush operation is handled by the operating system. But in some cases, this parameter can be set to ensure high reliability
            if (unflushedMessages >= config.flushInterval)
              flush()
    
            // Step 13: Return the write result
            appendInfo
          }
        }
      }

Is there anything in these steps that you need to pay special attention to? I would like you to focus on the first step, which is how Kafka verifies messages, especially how it does verification for different message format versions.

Speaking of message verification, do you remember the LogAppendInfo class we mentioned in the previous lecture? It is a simple POJO class that stores almost all the information of the message set to be written. Let’s take a closer look at it. // MessageSet头部相关字段 val offset = file.readLong() val size = file.readInt() val crc = file.readInt() val magic = file.readShort() val attributes = file.readShort() val createdAt = file.readLong() val producerId = file.readLong() val producerEpoch = file.readShort() val sequence = file.readInt()

// 消息集合头部大小 val headerSize = 10 + 16

// 多个消息集合 val records = new ArrayBufferRecord while (size >= headerSize) { // 读取每个消息集合 val recordSize = file.readInt() val start = file.position val record = Record.readFrom(file)

// 更新size size -= (file.position - start)

// 添加到数组中 records += record }

The read method has a relatively simple process. Let’s start by looking at its method signature:

def read(startOffset: Long,
          maxLength: Int,
          isolation: FetchIsolation,
          minOneMessage: Boolean): FetchDataInfo = {
          ......
}

It takes 4 parameters with the following meanings:

  • startOffset: the offset value from which to start reading messages from the Log object.
  • maxLength: the maximum number of bytes that can be read.
  • isolation: sets the fetch isolation level, which mainly controls the maximum offset value that can be read and is commonly used for Kafka transactions.
  • minOneMessage: whether to allow at least one message to be read. If a message is very large and exceeds maxLength, the read method normally wouldn’t return any messages. However, if this parameter is set to true, the read method will ensure that at least one message is returned.

The return value of the read method is an instance of the FetchDataInfo class, which is a POJO class. The most important data inside it is the collection of read messages, and it also includes other metadata such as offsets.

Now let’s examine the flow of the read method:

def read(startOffset: Long,
          maxLength: Int,
          isolation: FetchIsolation,
          minOneMessage: Boolean): FetchDataInfo = {
    maybeHandleIOException(s"Exception while reading from $topicPartition in dir ${dir.getParent}") {
      trace(s"Reading $maxLength bytes from offset $startOffset of length $size bytes")

      val includeAbortedTxns = isolation == FetchTxnCommitted

      // To read messages, no Monitor lock synchronization mechanism is used. Therefore, here we use a
      // local variable to save the LEO object, avoiding contention (race condition).
      val endOffsetMetadata = nextOffsetMetadata
      val endOffset = nextOffsetMetadata.messageOffset
      if (startOffset == endOffset) // If reading from LEO, no data will be returned, so return an empty collection of messages.
        return emptyFetchDataInfo(endOffsetMetadata, includeAbortedTxns)

      // Find the log segment object where the startOffset value is located. Note that the floorEntry method should be used.
      var segmentEntry = segments.floorEntry(startOffset)

      // Return an error if attempting to read beyond the log end offset or below log start offset
      // Any of the following conditions are considered out of bounds: 
      // 1. The offset of the message to be read exceeds the LEO value.
      // 2. No corresponding log segment object is found.
      // 3. The message to be read is below the Log Start Offset, which is also invisible externally.
      if (startOffset > endOffset || segmentEntry == null || startOffset < logStartOffset)
        throw new OffsetOutOfRangeException(s"Received request for offset $startOffset for partition $topicPartition, " +
          s"but we only have log segments in the range $logStartOffset to $endOffset.")

      // Check the fetch isolation level setting.
      // Regular consumer can see messages between [Log Start Offset, High Watermark).
      // Transactional consumer can only see messages between [Log Start Offset, Log Stable Offset]. Log Stable Offset(LSO) is an offset smaller than LEO and is used for Kafka transactions.
      // Follower replica consumer can see messages between [Log Start Offset, LEO).
      val maxOffsetMetadata = isolation match {
        case FetchLogEnd => nextOffsetMetadata
        case FetchHighWatermark => fetchHighWatermarkMetadata
        case FetchTxnCommitted => fetchLastStableOffsetMetadata
      }

      // If the startOffset to be read exceeds the maximum position that can be read, return an empty collection of messages because no messages can be read.
      if (startOffset > maxOffsetMetadata.messageOffset) {
        val startOffsetMetadata = convertToOffsetMetadataOrThrow(startOffset)
        return emptyFetchDataInfo(startOffsetMetadata, includeAbortedTxns)
      }

      // Start traversing log segment objects until something is read or the end of the log is reached.
      while (segmentEntry != null) {
        val segment = segmentEntry.getValue

        val maxPosition = {
          if (maxOffsetMetadata.segmentBaseOffset == segment.baseOffset) {
            maxOffsetMetadata.relativePositionInSegment
          } else {
            segment.size
          }
        }

        // Call the read method of the log segment object to perform the actual message reading operation.
        val fetchInfo = segment.read(startOffset, maxLength, maxPosition, minOneMessage)
        if (fetchInfo == null) { // If no messages are returned, try the next log segment object.
          segmentEntry = segments.higherEntry(segmentEntry.getKey)
        } else { // Otherwise, return the fetchInfo.
          return if (includeAbortedTxns)
            addAbortedTransactions(startOffset, segmentEntry, fetchInfo)
          else
            fetchInfo
        }
      }

      // If the end of the log is reached and no data is returned, only an empty collection of messages can be returned.
      FetchDataInfo(nextOffsetMetadata, MemoryRecords.EMPTY)
    }
  }

Summary #

Today, I focused on Kafka’s Log object and its common operations. Let’s review:

  1. High Watermark Management: The Log object defines the high watermark object and various operations to manage it, including updating and reading.
  2. Log Segment Management: As the container for log segments, the Log object holds many log segment objects. You need to understand how these log segment objects are organized and how the Kafka Log object manages them.
  3. Key Offset Value Management: This mainly involves managing the Log Start Offset and the Log End Offset (LEO). These two offset values are crucial fields in the Log object. Advanced features like replica management and state machine management rely on them.
  4. Read/Write Operations: Log reading and writing are the cornerstone of implementing the basic functionality of the Kafka message engine. While you don’t need to understand the meaning of every line of code, you should at least have a general understanding of the operation flow.

With that, I have introduced the Kafka Log source code. I recommend paying special attention to the code for high watermark management and read/write operations (especially the latter), and analyze the implementation principles of these two parts based on what I discussed today. Finally, I have provided a mind map to help you understand and remember these common operations in the Log source code:

After-class Discussion #

Can you add a method to the Log object that counts the total number of messages between the high-water mark and the LEO value?

Feel free to express yourself in the comments section and discuss with me. You are also welcome to share today’s content with your friends.