01 How Is the Object That Saves Log Segment Messages Implemented

01 How is the Object that Saves Log Segment Messages Implemented #

Hello, I am Hu Xi.

Today, we will start studying the first module of Kafka source code analysis: Log, LogSegment, and Index source code.

LogSegment and its related code are one of the most important components in Kafka server source code. You may be very interested in how messages are saved and organized in Kafka. After all, no matter what message engine you are learning, understanding the message modeling method is the first problem. Therefore, it is necessary for you to study the source code implementation of this important submodule, LogSegment.

In addition, understanding LogSegments has a lot of practical significance. For example, you must be curious about the naming of the underlying log file 00000000000000012345.log in Kafka. After studying LogSegments, I believe this question will be easily answered.

Today, I will take you through the source code of the LogSegment section in detail. But before that, you need to understand the log structure of Kafka.

Overview of Kafka Log Structure #

The organizational structure of Kafka logs on disk is shown in the following diagram:

The log is an important component of the Kafka server code, and many other core components are built on top of it, such as the state management machine and the replica manager that we will discuss later.

In general, a Kafka log object consists of multiple log segment objects, and each log segment object creates a set of files on disk, including the message log file (.log), the offset index file (.index), the timestamp index file (.timeindex), and the aborted transaction index file (.txnindex). Of course, if you are not using Kafka transactions, the aborted transaction index file will not be created. The string of numbers 0 represents the base offset of the log segment, which is the offset value of the first message stored in the log segment.

In most cases, a Kafka topic has multiple partitions, and each partition corresponds to a Log object, which in turn corresponds to a subdirectory on the physical disk. For example, if you create a topic called “test-topic” with two partitions, Kafka will create two subdirectories on the disk: test-topic-0 and test-topic-1. In the server code, these are represented as two Log objects. Each subdirectory contains multiple sets of log segments, which are combinations of .log, .index, and .timeindex files. The file names are different because each log segment has a different base offset.

Log Segment Code Analysis #

It is necessary to read the source code of the log segment because the log segment is the smallest unit in Kafka that stores messages. In other words, messages are stored in log segments. However, the official documentation provides very limited information about log segments, so many people have little knowledge about this important concept.

However, if we are not familiar with log segments, we will not be able to quickly find solutions to problems that may arise in production environments. Let me share a real case with you.

Our company encountered a problem before, where a large number of log segments were split at the same time, causing the disk I/O bandwidth to be fully utilized. In this situation, everyone was helpless, and we had to seek help from the log segment source code.

Finally, we found a solution in the shouldRoll method of the LogSegment: by setting the broker-side parameter log.roll.jitter.ms to a value greater than 0, we can add a perturbation value to the execution time of log segment splitting. This can avoid a large number of log segments performing splitting actions at the same time, significantly reducing disk I/O.

Later, during the retrospective, we unanimously agreed that reading the LogSegment source code was the correct decision. Otherwise, by simply reading the official documentation on this parameter, we might not have been able to understand its true purpose. So, what is the specific purpose of the log.roll.jitter.ms parameter? I will explain it in detail when we talk about log segments later.

Without further ado, let’s take a look at the log segment source code. I will focus on explaining the LogSegment class declaration, the append method, the read method, and the recover method.

First of all, you need to know that the log segment source code is located in the core project of Kafka, specifically the core/src/main/scala/kafka/log/LogSegment.scala file. In fact, all the source code related to the log structure is located under the kafka.log package in the core project.

There are three Scala objects defined in this file:

  • LogSegment class
  • LogSegment object
  • LogFlushStats object. The Stats in LogFlushStats is used for statistics and is mainly responsible for timing the log flush to disk.

We are mainly concerned with the LogSegment class and object. In the Scala language, when a class and an object with the same name are defined in the same source code file, it is called a companion. The class object is called the companion class, which is the same as a class in Java. The object is a singleton object used to store static variables or static methods. If this were done in Java, we would have to write two classes to achieve the same effect, which would be the LogSegment class and the LogSegmentUtils class. In Scala, we can use the companion directly.

By the way, it is worth mentioning that the source code comments in Kafka are very detailed. I don’t intend to include the comments here, but I highly recommend that you read the comments in the source code. For example, the beginning of the log segment file has a long and fascinating comment. Let me show you a fragment to give you a taste:

“A segment of the log. Each segment has two components: a log and an index. The log is a FileRecords containing the actual messages. The index is an OffsetIndex that maps from logical offsets to physical file positions. Each segment has a base offset which is an offset <= the least offset of any message in this segment and > any offset in any previous segment.”

This paragraph clearly explains that each log segment consists of two core components: the log and the index. Here, the index refers to index files in a broad sense. In addition, this comment states an important fact: each log segment has a base offset, which is an offset that is less than or equal to the offset of any message in this segment, and greater than any offset in any previous segments. After reading this comment, we can quickly understand the role of the base offset in the log segment.

LogSegment Class Declaration #

Next, I will provide and explain key code snippets in batches. First, let’s look at the definition of the LogSegment:

class LogSegment private[log] (val log: FileRecords,
                               val lazyOffsetIndex: LazyIndex[OffsetIndex],
                               val lazyTimeIndex: LazyIndex[TimeIndex],
                               val txnIndex: TransactionIndex,
                               val baseOffset: Long,
                               val indexIntervalBytes: Int,
                               val rollJitterMs: Long,
    val time: Time) extends Logging { … }

As I mentioned earlier, a log segment includes objects such as the message log file, offset index file, timestamp index file, and transaction index file. Here, FileRecords is the object that actually stores Kafka messages. I will discuss how Kafka stores messages, specifically FileRecords and its implementation, later in this column. At the same time, I will also introduce how the community has evolved in message persistence, so be sure not to miss that part.

The lazyOffsetIndex, lazyTimeIndex, and txnIndex correspond to the three index files mentioned earlier. However, in terms of implementation, the first two use lazy initialization, which reduces the initialization time cost. We will discuss the indexes in detail later when we talk about them.

Each log segment object saves its own base offset value, which is a very important attribute! In fact, the value you see on the disk as the filename is the value of the base offset. Once a LogSegment object instance is created, its base offset is fixed and cannot be changed.

The indexIntervalBytes value is actually the broker-side parameter log.index.interval.bytes value, which controls the frequency of adding index entries to the log segment object. By default, a log segment will add an index entry only when at least 4KB of message data is written. The rollJitterMs is the “jitter value” for the countdown of a new log segment. Since the log segment’s countdown is currently set globally on the broker, this means that multiple log segment objects may be created at the same time in the future, which will greatly increase the physical disk I/O pressure. With the interference of the rollJitterMs value, each new log segment will be created a little later than the others, thereby reducing the I/O load on the physical disk.

As for the last time parameter, it is an implementation class used for timing statistics, which appears widely in Kafka source code. I won’t go into detail about it.

Now let’s move on to some important methods.

For a log segment, the most important methods are writing messages and reading messages, which correspond to the append method and the read method in the source code, respectively. In addition, the recover method is also crucial as it is responsible for recovering the log segment after a broker restart.

append Method #

Let’s start with the append method to understand how messages are written.

def read(offset: Long,
           maxLength: Int,
           maxOffset: Option[Long] = None): FetchDataInfo = {
    trace(s"Reading log at offset $offset, length $maxLength, maxOffset $maxOffset")
    // locate the log segment and position read should start
    val fetchDataInfo = FetchDataInfo(OffsetAndEpoch.InvalidOffsetAndEpoch, MemoryRecords.EMPTY)
    val logOffsetMetadata = validateOffsetMetadata(offset, fetchMaxBytes)

    Option(logOffsetMetadata) match {
      case Some(earliestLogOffsetMetadata) =>
        // support fuzzy offset fetch only in log cleaner case (i.e. "metadata.broker.list" is not configured)
        fetcher match {
          case Some(fetcher) =>
            readEarliestIncompleteTxnRecords(offset, maxOffset, maxLength, maxOffsetMetadata = fetcher.lastFetchEpochData.pollLastOffsetAndMetadata(), lastReadOffsetMetadata = logOffsetMetadata)
          case _ =>
            fetchDataInfo.fill(records = log.read(offset, maxLength), lastStableOffset = earliestLogOffsetMetadata)
        }
      case None =>
        if (log.logEndOffset == offset) {
          fetchDataInfo
        } else {
          val exception = new OffsetOutOfRangeException(s"Cannot fetch offset ${offset} log end offset ${log.logEndOffset} in ${log.dir} please ensure the offset is valid before retrying")
          error(exception.getMessage, exception)
          fetchDataInfo
        }
    }
  }

read 方法主要用于读取指定偏移量后的日志段内容。它接收三个参数, 偏移量 offset 表示要从哪个偏移量开始读,maxLength 表示最大读取的字节数,maxOffset 表示最大读取的偏移量(可选,默认为 None)。

read 方法首先会调用 validateOffsetMetadata 方法,该方法会根据传入的偏移量和最大读取字节数进行一些校验操作,返回一个日志段元数据 logOffsetMetadata。

根据 logOffsetMetadata 进一步进行处理:

  • 如果 logOffsetMetadata 不为 None,则说明校验通过,继续进行下面的操作。
    • 如果设置了 fetcher,则调用 readEarliestIncompleteTxnRecords 方法读取未完成事务的记录。
    • 如果没有设置 fetcher,则直接调用 log 的 read 方法读取指定偏移量后的记录,并填充到 fetchDataInfo 中。
  • 如果 logOffsetMetadata 为 None,则说明校验失败,返回一个 OffsetOutOfRangeException 的异常信息。

最后返回一个 FetchDataInfo 对象,其中包含了从日志段读取到的记录和最后一个稳定的偏移量。

至此,read 方法的解析完毕。接下来,我们将会逐步解析其他方法的原理和实现。

def read(startOffset: Long,
           maxSize: Int,
           maxPosition: Long = size,
           minOneMessage: Boolean = false): FetchDataInfo = {
    if (maxSize < 0)
      throw new IllegalArgumentException(s"Invalid max size $maxSize for log read from segment $log")

    val startOffsetAndSize = translateOffset(startOffset)

    // if the start position is already off the end of the log, return null
    if (startOffsetAndSize == null)
      return null

    val startPosition = startOffsetAndSize.position
    val offsetMetadata = LogOffsetMetadata(startOffset, this.baseOffset, startPosition)

    val adjustedMaxSize =
      if (minOneMessage) math.max(maxSize, startOffsetAndSize.size)
      else maxSize

    // return a log segment but with zero size in the case below
    if (adjustedMaxSize == 0)
      return FetchDataInfo(offsetMetadata, MemoryRecords.EMPTY)

    // calculate the length of the message set to read based on whether or not they gave us a maxOffset
    val fetchSize: Int = min((maxPosition - startPosition).toInt, adjustedMaxSize)

    FetchDataInfo(offsetMetadata, log.slice(startPosition, fetchSize),
      firstEntryIncomplete = adjustedMaxSize < startOffsetAndSize.size)
}

The read method takes 4 parameters:

  • startOffset: the offset of the first message to read.
  • maxSize: the maximum number of bytes that can be read.
  • maxPosition: the maximum file position that can be read.
  • minOneMessage: whether to return at least one message even if its size exceeds maxSize.

The meanings of the first 3 parameters are straightforward. Let me explain the 4th parameter. When this parameter is set to true, even if the size of a message exceeds maxSize, the read method is still able to return at least one message. This parameter is mainly introduced to prevent consumer starvation.

The following diagram shows the complete execution logic of the read method:

image

The logic is quite simple. Let’s go through it step by step.

The first step is to call the translateOffset method to locate the starting file position to read (startPosition). The startOffset input parameter is just an offset value, and Kafka needs to find the corresponding physical file position based on the index information to start reading messages.

Once the starting position to read is determined, the log segment code needs to calculate the total number of bytes to read based on this information as well as the maxSize and maxPosition parameters. For example, if maxSize=100, maxPosition=300, and startPosition=250, then the read method can only read 50 bytes, because maxPosition - startPosition = 50. We compare it with the maxSize parameter, and the minimum value between them is the total number of bytes that can be read.

The final step is to call the slice method of FileRecords to read the message set of the specified size from the specified position.

recover method #

In addition to the append and read methods, LogSegment also has an important method, recover, which is used to recover a log segment.

The following code is the source code of the recover method. What does it mean to recover a log segment? In fact, when the broker starts, it loads all the log segment information from the disk into memory and creates the corresponding LogSegment objects. During this process, it needs to perform a series of operations.

def recover(producerStateManager: ProducerStateManager, leaderEpochCache: Option[LeaderEpochFileCache] = None): Int = {
    offsetIndex.reset()
    timeIndex.reset()
    txnIndex.reset()
    var validBytes = 0
    var lastIndexEntry = 0
    maxTimestampSoFar = RecordBatch.NO_TIMESTAMP
    try {
      for (batch <- log.batches.asScala) {
        batch.ensureValid()
        ensureOffsetInRange(batch.lastOffset)

        // The max timestamp is exposed at the batch level, so no need to iterate the records
        if (batch.maxTimestamp > maxTimestampSoFar) {
          maxTimestampSoFar = batch.maxTimestamp
          offsetOfMaxTimestampSoFar = batch.lastOffset
        }

        // Build offset index
        if (validBytes - lastIndexEntry > indexIntervalBytes) {
          offsetIndex.append(batch.lastOffset, validBytes)
          timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar)
          lastIndexEntry = validBytes
        }
        validBytes += batch.sizeInBytes()

        if (batch.magic >= RecordBatch.MAGIC_VALUE_V2) {
          leaderEpochCache.foreach { cache =>
            if (batch.partitionLeaderEpoch > 0 && cache.latestEpoch.forall(batch.partitionLeaderEpoch > _))
              cache.assign(batch.partitionLeaderEpoch, batch.baseOffset)
          }
          updateProducerState(producerStateManager, batch)
        }
      }
    } catch {
      case e@ (_: CorruptRecordException | _: InvalidRecordException) =>
        warn("Found invalid messages in log segment %s at byte offset %d: %s. %s"
          .format(log.file.getAbsolutePath, validBytes, e.getMessage, e.getCause))
    }
    val truncated = log.sizeInBytes - validBytes
    if (truncated > 0)
      debug(s"Truncated $truncated invalid bytes at the end of segment ${log.file.getAbsoluteFile} during recovery")

    log.truncateTo(validBytes)
    offsetIndex.trimToValidSize()
    // A normally closed segment always appends the biggest timestamp ever seen into log segment, we do this as well.
    timeIndex.maybeAppend(maxTimestampSoFar, offsetOfMaxTimestampSoFar, skipFullCheck = true)
    timeIndex.trimToValidSize()
    truncated
}

I will use a diagram to illustrate the processing logic of the recover method:

image

When recover starts, the code calls the reset method of the index objects to clear all index files. Then it begins to iterate through all message sets or batch messages (RecordBatch) in the log segment. For each message set read, the log segment must ensure that they are valid, which mainly involves two aspects:

  1. The messages in the set must conform to the binary format defined by Kafka.
  2. The offset of the last message in the set should not go beyond the bounds, i.e., the difference between it and the starting offset of the log segment must be a positive integer value.

After validating the message set, the code updates the maximum timestamp observed during iteration as well as the offset of the message it belongs to. Similarly, these two values are used to build index entries. Next, the code accumulates the current number of bytes read and conditionally writes the index entry based on that value. Finally, it updates the state of the transactional producer and the Leader Epoch cache. However, these two components are not necessary for understanding the Kafka log structure, so we can ignore them.

After the iteration is completed, Kafka compares the current total number of bytes in the log segment with the accumulated number of bytes read. If the former is greater than the latter, it means that some illegal messages have been written to the log segment, and truncation is required to adjust the log segment size back to a legal value. At the same time, Kafka also needs to adjust the size of the index files accordingly. After completing these operations, the recovery process of the log segment is finished as well.

Summary #

Today, we focused on analyzing the source code of the Kafka log segment, including the append method, read method, and recover method.

  1. The append method: I specifically analyzed how the source code writes messages to the log segment. You should pay special attention to when the index is updated during the write operation process.
  2. The read method: I specifically analyzed the complete process of reading messages at the underlying level of the source code. You should pay attention to the logic of calculating the number of bytes to be read in Kafka, that is, how maxSize, maxPosition, and startOffset affect the read method together.
  3. The recover method: This operation reads the log segment file and rebuilds the index file. I want to emphasize again that this operation reads the log segment file during the execution process. Therefore, if you have many log segment files in your environment and you find that the Broker restart is slow, now you know that this is because Kafka needs to read a large number of disk files during the execution of the recover process. You see, this is what we gained from reading the source code.

image

These three methods are the most important functions of the log segment object. You must read them carefully and try to understand the purpose of each line of code in the source code. There is no code that cannot be understood by reading it once. If there is, then read it a few more times. In addition, I hope you pay special attention to the append and read methods, as they will be the two key methods we will discuss when talking about log objects later. After all, reading and writing logs are the most commonly used operations in Kafka, and the underlying calls for log reading are exactly these two methods of the log segment.

Post-discussion #

If you take a look at the source code of the log segment, you will find that there is another important method that I did not mention, which is the truncateTo method. This method forcibly truncates the data in the log segment to the specified offset. The method is only about 20 lines of code, and I hope you can read it yourself and think about the following question: What will happen if the specified offset value is extremely large, exceeding the maximum offset value that the log segment can store?

Feel free to share your thoughts in the comments and discuss with me. You are also welcome to share this article with your friends.