02 Logs How Exactly Are Log Segments Loaded

02 Logs How Exactly are Log Segments Loaded #

Hello, I’m Hu Xi. Today, I’m going to talk about the Log object in Kafka source code.

In the previous lesson, we learned about the source code for log segments. You can think of a log as a container for log segments, which defines many operations for managing log segments. To be honest, if you look at the Kafka source code but don’t look at the Log, it’s like buying this course without knowing who the author is. In my opinion, the Log object is the most core part of Kafka source code, especially in the Broker side.

How important is it exactly? Let me share an example with you. Recently, I have been fixing a bug in Kafka (KAFKA-9157): in some cases, Kafka’s compaction operation can generate many empty log segment files. To avoid creating these empty log segment files, one must understand the principles of creating log segment files, and these codes are exactly in the Log source code.

Since the Log source code manages log segment objects, it must first load all the log segment objects into memory. How is this process implemented? Today, I will take you through the process of loading log segments into a log.

First, let’s take a look at the source code structure of the Log object.

Log Source Code Structure #

The Log source code is located in the log package of the Kafka core project, with the file name Log.scala. In general, this file defines 10 classes and objects, as shown in the following diagram:

So what do these 10 classes and objects do? Let me give you a brief introduction so that you can have a rough understanding of them.

However, before I introduce them, let me mention that the letters C and O in the parentheses in the diagram represent Class and Object, respectively. Do you remember the companion objects I mentioned in the previous lesson? Yes, defining a Class and an Object with the same name belongs to the usage of companion objects in Scala.

Let’s start with the companion objects, namely LogAppendInfo, Log, and RollParams.

1. LogAppendInfo

  • LogAppendInfo (C): Stores various metadata information about a set of messages to be written. For example, the offset value of the first message in this set, the offset value of the last message, and the maximum timestamp of the messages. In short, the data inside is very rich (I will explain in more detail in the next lesson).
  • LogAppendInfo (O): Can be understood as the factory class corresponding to its companion class, which defines some factory methods for creating specific instances of LogAppendInfo.

2. Log

  • Log (C): The most core code in the Log source code. I will keep you in suspense about this for now, and we will discuss it in detail later.
  • Log (O): Similarly, it is the factory method of the companion class Log, which defines many constants and auxiliary methods.

3. RollParams

  • RollParams (C): Defines the data structure used to control whether the log segment should be rolled.
  • RollParams (O): Similarly, it is the factory method of the companion class RollParams.

In addition to these 3 sets of companion objects, there are also 4 classes in the source code.

  • LogMetricNames: Defines the monitoring metrics for the Log object.
  • LogOffsetSnapshot: Encapsulates a container class for all offset metadata of a partition.
  • LogReadInfo: Encapsulates the data and metadata returned when reading a log.
  • CompletedTxn: Records the metadata of completed transactions, mainly used for building transaction indexes.

Log Class & Object #

Below, I will break down each of these classes and objects one by one according to their importance. First, let’s talk about the Log class and its companion object.

Considering that companion objects are often used to store static variables and static methods (such as static factory methods), let’s start with the implementation of the companion object (i.e. Log Object). After all, it’s easier to start with something soft!

object Log {
  val LogFileSuffix = ".log"
  val IndexFileSuffix = ".index"
  val TimeIndexFileSuffix = ".timeindex"
  val ProducerSnapshotFileSuffix = ".snapshot"
  val TxnIndexFileSuffix = ".txnindex"
  val DeletedFileSuffix = ".deleted"
  val CleanedFileSuffix = ".cleaned"
  val SwapFileSuffix = ".swap"
  val CleanShutdownFile = ".kafka_cleanshutdown"
  val DeleteDirSuffix = "-delete"
  val FutureDirSuffix = "-future"
……
}

These are all the constants defined by the Log Object. If an interviewer asks you how many file types are defined in Kafka, you can proudly state these. I won’t explain the familiar ones like .log, .index, .timeindex, and .txnindex. Let’s learn about the other file types instead.

  • .snapshot is a snapshot file created by Kafka for idempotent or transactional Producers. Since we are still in the early stage of reading the source code, I won’t go into detail about the source code related to transactions or idempotence.
  • .deleted is a file created by the deletion of log segments. Currently, log segment deletion is an asynchronous operation, and the Broker changes the file extension from .log to .deleted. If you see a bunch of files with the .deleted extension, don’t panic, that’s Kafka performing log segment deletion.
  • .cleaned and .swap are products of the Compaction operation. We will talk more about the Cleaner later.
  • -delete is applied to folders. When you delete a topic, the topic’s partition folder is appended with this suffix.
  • -future is used to change the address of the topic partition folder, which is a more advanced usage.

In short, remember these constants. Remembering their main use is important, so you won’t be caught off guard by interviewers! Just kidding, actually the most important thing about these constants is that they can help you understand the various file types defined by Kafka.

The Log Object also defines numerous utility methods. Since they are all simple, I will only provide the source code for one method, let’s read it together.

def filenamePrefixFromOffset(offset: Long): String = {
    val nf = NumberFormat.getInstance()
    nf.setMinimumIntegerDigits(20)
    nf.setMaximumFractionDigits(0)
    nf.setGroupingUsed(false)
    nf.format(offset)
  }

The purpose of this method is to calculate the corresponding log segment file name based on the given offset. Kafka log files are always 20 characters in length, and the filenamePrefixFromOffset method pads the given offset with leading zeros to expand it into a fixed-length 20-character string.

For example, if we give an offset value of 12345, then the corresponding log segment file name on the Broker’s disk would be 00000000000000012345.log. Simple, right? The other utility methods are also straightforward, so I won’t explain them one by one.

Now let’s move on to the main show of the Log class in the Log source code. This is a class with over 2000 lines. Looking at the entire Kafka source code, it’s not often to see a class as large as Log, which shows its importance. Let’s first look at the definition of this class:

class Log(@volatile var dir: File,
          @volatile var config: LogConfig,
          @volatile var logStartOffset: Long,
          @volatile var recoveryPoint: Long,
          scheduler: Scheduler,
          brokerTopicStats: BrokerTopicStats,
          val time: Time,
          val maxProducerIdExpirationMs: Int,
          val producerIdExpirationCheckIntervalMs: Int,
          val topicPartition: TopicPartition,
val producerStateManager: ProducerStateManager,
logDirFailureChannel: LogDirFailureChannel) extends Logging with KafkaMetricsGroup {
  ……
}

It looks like there are many properties, but actually, you only need to remember the purpose of two properties: dir and logStartOffset. dir is the folder path of the log, which is the path of the topic partition. logStartOffset represents the current earliest offset of the log. Both dir and logStartOffset are of type volatile var, which means their values can change and may be updated by multiple threads.

You may have heard of the current end offset of the log, also known as Log End Offset (LEO), which represents the offset value of the next message to be inserted into the log. On the other hand, the Log Start Offset represents the earliest visible message offset of the log. I will use a diagram to illustrate the difference:

In the diagram, the green offset value 3 represents the Log Start Offset of the log, and the offset value 15 represents the LEO. Additionally, the offset value 8 represents the high watermark, which is the boundary between committed and uncommitted messages.

Interestingly, Log End Offset can be abbreviated as LEO, but Log Start Offset cannot be abbreviated as LSO. In Kafka, LSO specifically refers to Log Stable Offset, which is a concept in Kafka transactions. This course does not cover LSO, so you only need to know that Log Start Offset is not equal to LSO.

Other properties of the Log class can be ignored for now, as they are either obvious utility class properties like timer and scheduler, or they are advanced properties used in advanced usage, such as producerStateManager and logDirFailureChannel. Utility class code is mostly auxiliary and skipping them does not prevent us from understanding the core functionality of Kafka. On the other hand, advanced functionality code is complex in design, has a high learning cost, and may not provide a good cost-benefit ratio.

In addition to the properties defined in the Log class signature, the Log class also defines some important properties, such as the following code:

@volatile private var nextOffsetMetadata: LogOffsetMetadata = _
@volatile private var highWatermarkMetadata: LogOffsetMetadata = LogOffsetMetadata(logStartOffset)
private val segments: ConcurrentNavigableMap[java.lang.Long, LogSegment] = new ConcurrentSkipListMap[java.lang.Long, LogSegment]
@volatile var leaderEpochCache: Option[LeaderEpochFileCache] = None

The first property nextOffsetMetadata encapsulates the offset value of the next message to be inserted, which you can basically equate to LEO.

The second property highWatermarkMetadata is the high watermark value of the partition log. We have explained the concept of the high water mark in detail in the “Kafka Core Technology and Practice” course, and you can take a look at this article for more information.

The third property segments is, in my opinion, the most important property in the Log class. It stores information about all log segments of the partition log, but it uses a Map data structure to store them. The key of the Map is the starting offset value of the log segment, and the value is the log segment object itself. Kafka source code uses the ConcurrentNavigableMap data structure to store log segment objects. This allows easy management of all log segment objects using the thread-safe and various sorting methods provided by this class.

The fourth property is the Leader Epoch Cache object. Leader Epoch is introduced in the community’s source code starting from version 0.11.0.0. It is mainly used to determine whether to perform log truncation (Truncation) when a failure occurs. The mechanism that used to rely on the high watermark may cause inconsistent data between replicas. The Leader Epoch Cache here is a caching class data that stores the mapping relationship between the epoch value of the partition leader and the corresponding offset value. I recommend you to take a look at the LeaderEpochFileCache class to have a deeper understanding of its implementation principle.

After understanding these basic properties, let’s take a look at the initialization logic of the Log class:

locally {
  val startMs = time.milliseconds

  // create the log directory if it doesn't exist
  Files.createDirectories(dir.toPath)

  initializeLeaderEpochCache()

  val nextOffset = loadSegments()

  /* Calculate the offset of the next message */
  nextOffsetMetadata = LogOffsetMetadata(nextOffset, activeSegment.baseOffset, activeSegment.size)

  leaderEpochCache.foreach(_.truncateFromEnd(nextOffsetMetadata.messageOffset))

  logStartOffset = math.max(logStartOffset, segments.firstEntry.getValue.baseOffset)

  // The earliest leader epoch may not be flushed during a hard failure. Recover it here.
  leaderEpochCache.foreach(_.truncateFromStart(logStartOffset))
}

// 删除.swap文件时,需要特别处理,不仅删除.swap文件本身,还要删除对应的原始日志段文件和索引文件 val offset = offsetFromSwapFile(filename) // 从.swap文件名中解析出位移值

      debug(s"Deleting swap file ${file.getAbsolutePath} for offset $offset")
     
      // 删除.swap文件,并将对应的原始日志段文件和索引文件加入待删除文件集合中
      Files.deleteIfExists(file.toPath)
      deleteIndicesIfExist(Log.logFile(dir, offset))
      deleteIndicesIfExist(Log.indexFile(dir, offset))
     
      swapFiles += file
      }
    }
  
    // 删除.min.opt文件,这个文件是用来控制目录最小可用位移的
  
    val minOffsetOpt = new File(dir, Log.minCleanedOffsetCheckpointFile)
  
    if (minOffsetOpt.exists) {
      debug(s"Deleting min offset checkpoint file ${(minOffsetOpt.getAbsolutePath)}")
      Files.deleteIfExists(minOffsetOpt.toPath)
    }
  
    // 删除.deleting文件,这个文件会在删除目录时记录
  
    val deleting = new File(dir, Log.deletingFile)
    if (deleting.exists) {
      debug(s"Deleting deleting file ${(deleting.getAbsolutePath)}")
      Files.deleteIfExists(deleting.toPath)
    }
  
    // 删除所有的.cleaned文件

    for (file <- cleanFiles) {
      debug(s"Deleting clean file ${file.getAbsolutePath}")
      Files.deleteIfExists(file.toPath)
    }
  
    swapFiles
  }

接下来,我们来看看retryOnOffsetOverflow方法的具体实现。首先,这个方法是一个高阶函数,接收一个函数作为参数。这个函数会在retryOnOffsetOverflow方法中被调用。好,我们来看看retryOnOffsetOverflow方法的实现:

// retryOnOffsetOverflow是一个高阶函数,调用这个函数时,参数f实际上是一个函数,该函数的返回值类型是Long

private def retryOnOffsetOverflow(f: => Long): Long = {

    // 因为调用方法参数f时,需要直接调用,而不是通过方法调用

    // 所以在这里我们定义了一个没有参数,返回值为Long类型的值计算表达式t,用 t => f(t) 定义了一个函数,将结果返还给参数f

    type IntializeT = Long => Long
  
    // retryOnOffsetOverflow内部定义了一个具体实现的高阶函数,用于按最大包含位数对offset进行拆分
  
    def initializeOnOffsetOverflow(t: => Long): IntializeT = { currentOffset => t }
  
    // 根据当前的offsetOnOverflow值选择策略
  
    val initializedT = offsetOnOverflow match {
      case OverflowPositionStrategy.LATEST => initializeOnOffsetOverflow(Long.MaxValue)
      case OverflowPositionStrategy.EARLIEST => initializeOnOffsetOverflow(0)
      case OverflowPositionStrategy.PRODUCER => initializeOnOffsetOverflow(producerStateManager.map(_.latestEpoch).getOrElse(0L))
      case OverflowPositionStrategy.RESTART => initializeOnOffsetOverflow(f) }
  
    // 调用参数函数f
  
    val initializedTValue = initializedT(f)
  
    offsetOnOverflow = offsetOnOverflow.nextOffsetStrategy
  
    initializedTValue
  }

好了,上面这段代码使用了一个高阶函数retryOnOffsetOverflow,这个函数接收一个函数参数f,并在函数retryOnOffsetOverflow中调用参数f。参数f的返回值类型是Long。在retryOnOffsetOverflow中,会调用initializeOnOffsetOverflow这个高阶函数,根据当前的offsetOnOverflow值选择一个策略,然后调用参数f。最终将初始化后的值返还给调用者。

另外,还有一点需要注意的是,offsetOnOverflow是一个枚举类型OverflowPositionStrategy的实例,可以通过调用nextOffsetStrategy方法,实现当前位置策略下个位置策略的选择,在下一次retryOnOffsetOverflow方法的调用中,选择不同的策略。

val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))

info(s"Found file ${file.getAbsolutePath} from interrupted swap operation.")

if (isIndexFile(baseFile)) { // If the .swap file was originally an index file
    deleteIndicesIfExist(baseFile) // Delete the original index file
} else if (isLogFile(baseFile)) { // If the .swap file was originally a log file
    deleteIndicesIfExist(baseFile) // Delete the original index file
    swapFiles += file // Add the .swap file to the collection of files to be recovered
}

// Filter out the .swap files whose starting offset is greater than minCleanedFileOffset
val (invalidSwapFiles, validSwapFiles) = swapFiles.partition(file => offsetFromFile(file) >= minCleanedFileOffset)

invalidSwapFiles.foreach { file =>
    debug(s"Deleting invalid swap file ${file.getAbsoluteFile}")
    val baseFile = new File(CoreUtils.replaceSuffix(file.getPath, SwapFileSuffix, ""))
    deleteIndicesIfExist(baseFile, SwapFileSuffix)
    Files.deleteIfExists(file.toPath)
}

// Delete all .clean files
cleanFiles.foreach { file =>
    debug(s"Deleting stray .clean file ${file.getAbsolutePath}")
    Files.deleteIfExists(file.toPath)
}

// Return the valid .swap files
validSwapFiles

After executing the removeTempFilesAndCollectSwapFiles logic, the code clears the existing segments and reloads the segment files. This is done in the loadSegmentFiles method:

// Sort the files in the directory by the offset in their names and iterate over them
for (file <- dir.listFiles.sortBy(_.getName) if file.isFile) {
    if (isIndexFile(file)) { // If it is an index file
        val offset = offsetFromFile(file)
        val logFile = Log.logFile(dir, offset)
        if (!logFile.exists) { // If the corresponding log file does not exist, log a warning and delete the index file
            warn(s"Found an orphaned index file ${file.getAbsolutePath}, with no corresponding log file.")
            Files.deleteIfExists(file.toPath)
        }
    } else if (isLogFile(file)) { // If it is a log file
        val baseOffset = offsetFromFile(file)
        val timeIndexFileNewlyCreated = !Log.timeIndexFile(dir, baseOffset).exists()
        
        // Create a LogSegment instance for the file and add it to the segments
        val segment = LogSegment.open(
            dir = dir,
            baseOffset = baseOffset,
            config,
            time = time,
            fileAlreadyExists = true
        )
        
        try segment.sanityCheck(timeIndexFileNewlyCreated)
        catch {
            case _: NoSuchFileException =>
                error(s"Could not find offset index file corresponding to log file ${segment.log.file.getAbsolutePath}, " +
                "recovering segment and rebuilding index files...")
                recoverSegment(segment)
                
            case e: CorruptIndexException =>
                warn(s"Found a corrupted index file corresponding to log file ${segment.log.file.getAbsolutePath} due " +
                s"to ${e.getMessage}}, recovering segment and rebuilding index files...")
                recoverSegment(segment)
        }
        
        addSegment(segment)
    }
}

The third step is handling the valid .swap files returned from the first step. This is done in the completeSwapOperations method:

// Iterate over all the valid .swap files
for (swapFile <- swapFiles) {
    val logFile = new File(CoreUtils.replaceSuffix(swapFile.getPath, SwapFileSuffix, "")) // Get the corresponding log file
    val baseOffset = offsetFromFile(logFile) // Get the starting offset of the log file
    
    // Create a LogSegment instance for the .swap file
    val swapSegment = LogSegment.open(
        swapFile.getParentFile,
        baseOffset = baseOffset,
        config,
        time = time,
        fileSuffix = SwapFileSuffix
    )
    
    info(s"Found log file ${swapFile.getPath} from interrupted swap operation, repairing.")
    
    // Recover the segment
    recoverSegment(swapSegment)
    
    // Check if the original segments still exist
    val oldSegments = logSegments(swapSegment.baseOffset, swapSegment.readNextOffset).filter { segment =>
        segment.readNextOffset > swapSegment.baseOffset
    }
    
    // Replace the old segments with the swap segment
    replaceSegments(Seq(swapSegment), oldSegments.toSeq, isRecoveredSwapFile = true)
}

Finally, the recoverLog method performs the fourth step:

// If there is no clean shutdown file, recover the log
if (!hasCleanShutdownFile) {
    val unflushed = logSegments(this.recoveryPoint, Long.MaxValue).toIterator
    
    var truncated = false
    
    // Iterate over the unflushed segments
    while (unflushed.hasNext && !truncated) {
        val segment = unflushed.next
        info(s"Recovering unflushed segment ${segment.baseOffset}")
        
        // Recover the segment
        val truncatedBytes =
            try {
                recoverSegment(segment, leaderEpochCache)
            } catch {
                case _: InvalidOffsetException =>
                    val startOffset = segment.baseOffset
                    warn("Found invalid offset during recovery. Deleting the corrupt segment and " +
                    s"creating an empty one with starting offset $startOffset")
                    segment.truncateTo(startOffset)
            }
        
        // If there are truncated bytes due to corrupt messages, delete the remaining segments
        if (truncatedBytes > 0) {
            warn(s"Corruption found in segment ${segment.baseOffset}, truncating to offset ${segment.readNextOffset}")
            removeAndDeleteSegments(unflushed.toList, asyncDelete = true)
            truncated = true
        }
    }
}

// If the logSegments collection is not empty
if (logSegments.nonEmpty) {
    val logEndOffset = activeSegment.readNextOffset
    
    // If the logEndOffset is smaller than logStartOffset, delete all the segments
    if (logEndOffset < logStartOffset) {
        warn(s"Deleting all segments because logEndOffset ($logEndOffset) is smaller than logStartOffset ($logStartOffset). " +
            "This could happen if segment files were deleted from the file system.")
        removeAndDeleteSegments(logSegments, asyncDelete = true)
    }
}

// If the logSegments collection is empty, create a new segment starting from logStartOffset
if (logSegments.isEmpty) {
    addSegment(LogSegment.open(
        dir = dir,
        baseOffset = logStartOffset,
        config,
        time = time,
        fileAlreadyExists = false,
        initFileSize = this.initFileSize,
        preallocate = config.preallocate
    ))
}

// Update the recoveryPoint and return it
recoveryPoint = activeSegment.readNextOffset
recoveryPoint

Summary #

Today, I mainly introduced the source code of Kafka’s Log, including:

  1. Source code structure of Log files: You can take a look at the following diagram, which shows the architecture of the Log class file. You should focus on understanding the Log class and its related methods.
  2. Mechanism for loading log segments: I analyzed in detail how logs are loaded during initialization based on the source code. As mentioned earlier, logs are containers for log segments, and understanding how log segments are loaded is a prerequisite for learning about log segment management in the following.

Overall, although I have written a few thousand words, I have only discussed the most important parts. I recommend that you read the code for loading log segments in Log.scala several times, as this will greatly help us understand the principles of log segment management in Kafka brokers. In the next lesson, I will continue to discuss the source code of the log section and guide you in learning common Kafka log operations.

After-class discussion #

There is a method called maybeIncrementHighWatermark in the Log source code. Can you explain its implementation principle?

Feel free to leave a message in the comment section to discuss and exchange ideas with me. You are also welcome to share today’s content with your friends.