04 on Indexes the Application of Improved Binary Search Algorithms in Kafka Indexes

04 On Indexes The Application of Improved Binary Search Algorithms in Kafka Indexes #

Hello, I am Hu Xi. Today, I’m going to teach you about the index objects in Kafka source code, as well as the application of the improved binary search algorithm in the index.

Why read source code of the index? #

To be honest, you may not have many opportunities to directly interact with the index or index files in Kafka. The index is a mysterious component, and Kafka’s official documentation doesn’t talk much about it. You might wonder if it’s necessary to read the source code of the index object if it’s so rarely used. Actually, it is very necessary! Let me share a real example with you.

Once, when I used the DumpLogSegments class in Kafka to view the contents of the underlying log files and index files, I noticed a strange phenomenon - viewing the contents of the log files did not require sudo permission, but viewing the contents of the index files did require sudo permission, as shown below:

$ sudo ./kafka-run-class.sh kafka.tools.DumpLogSegments  --files ./00000000000000000000.index
Dumping 00000000000000000000.index
offset: 0 position: 0


$ ./kafka-run-class.sh kafka.tools.DumpLogSegments --files 00000000000000000000.index
Dumping 00000000000000000000.index
Exception in thread "main" java.io.FileNotFoundException: 00000000000000000000.index (Permission denied)
......

After reading the index source code, I realized that Kafka opens the index files using the “rw” mode. In fact, reading a file does not require write permissions, only read permissions are needed. This is clearly a bug in Kafka. You see, by reading the source code, I found the root cause of the problem and even fixed a bug in Kafka (KAFKA-5104).

In addition to helping us solve practical problems, the source code of the index component also has a highlight, which is that it applies the well-known binary search algorithm to quickly locate index entries. Regarding algorithms, I have always felt regretful that we focus too much on the algorithms themselves and neglect their application in practical scenarios.

For example, we have learned too many sorting algorithms, but for ordinary application developers, opportunities to personally use these algorithms to complete programming tasks are rare. When it comes to array sorting, you might only remember to call the Collections.sort method, but you’re not sure what sorting algorithm it uses under the hood.

The rare thing is that the index component in Kafka applies the binary search algorithm, and the community has also improved it according to Kafka’s own characteristics. Isn’t it worth learning about this?! Let’s start learning now without further ado.

Index Class Diagram and Source File Organization #

In the Kafka source code, there are 5 source code files related to indexes, all located in the /src/main/scala/kafka/log directory of the core package. Let’s take a look at each one of them:

  • AbstractIndex.scala: This file defines the top-level abstract class that encapsulates common operations for all index types.
  • LazyIndex.scala: This file defines a wrapper class on top of AbstractIndex that implements lazy loading of index entries. This class is mainly for performance improvement.
  • OffsetIndex.scala: This file defines the offset index, which stores the mapping between offset values and the corresponding file disk locations.
  • TimeIndex.scala: This file defines the timestamp index, which stores the mapping between timestamps and offset values.
  • TransactionIndex.scala: This file defines the transaction index, which stores important metadata for aborted transactions. This index only appears when Kafka transaction is enabled.

The relationship between these classes is shown in the diagram below:

Among them, OffsetIndex, TimeIndex, and TransactionIndex all inherit from the AbstractIndex class, while the higher-level LazyIndex simply wraps an implementation class of AbstractIndex for lazy loading. As I mentioned before, the purpose of LazyIndex is to improve performance and does not have any functional improvements.

So today, I will first talk about the code of the abstract parent class AbstractIndex. In the next class, I will share with you the specific implementation classes of indexes.

AbstractIndex code structure #

Let’s first take a look at the class definition of AbstractIndex:

abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable {
    ......
} 

AbstractIndex defines 4 attribute fields. As an abstract base class, all its subclasses automatically inherit these 4 fields. In other words, all types of Kafka index objects define these attributes. Let me explain the meaning of these attributes to you.

  1. The index file (file). Each index object corresponds to an index file on disk. You may have noticed that this field is of type var, which means that it can be modified. Can the index object dynamically change the underlying index file? Yes. Since version 1.1.0, Kafka allows migration of the underlying log path, so it is natural for the index file to be replaceable.

  2. The base offset (baseOffset). The index object corresponds to the starting offset of the log segment object. For example, if you look at the Kafka log path, you will find that log files and index files appear in pairs. For example, if the log file is 00000000000000000123.log, normally, there must be a corresponding group of index files: 00000000000000000123.index, 00000000000000000123.timeindex, etc. The “123” here is the starting offset of this group of files, which is the baseOffset value.

  3. The maximum number of bytes for the index file (maxIndexSize). It controls the maximum length of the index file. The value passed into this parameter in the Kafka source code is the value of the broker-side parameter segment.index.bytes, which is 10MB. This is why, by default, all Kafka index files have a size of 10MB.

  4. The open mode of the index file (writable). “True” indicates that the file is opened in “read-write” mode, and “False” indicates that it is opened in “read-only” mode. If I remember correctly, I added this parameter to fix the bug I just mentioned.

AbstractIndex is an abstract indexing object class. It can be said that it serves as a container for the index entries, and each subclass that inherits it is responsible for defining the specific structure of the index entries. For example, the index entries for OffsetIndex are pairs of <offset value, physical disk position>, and the index entries for TimeIndex are pairs of <timestamp, offset value>. Based on this design philosophy, the AbstractIndex class defines an abstract method entrySize to represent the size of different index entries, as shown below:

protected def entrySize: Int

When implementing this method in subclasses, you need to provide the size of your own index entries. For OffsetIndex, the value is 8; for TimeIndex, the value is 12, as shown below:

// OffsetIndex
override def entrySize = 8
// TimeIndex
override def entrySize = 12

Now you may ask, why 8 and 12? Let me explain.

In OffsetIndex, the offset value is represented by 4 bytes, and the physical disk position is also represented by 4 bytes, so the total is 8 bytes. You may say that the offset value is a long integer, so it should be 8 bytes, right?

Do you still remember that AbstractIndex already saved the baseOffset? The offset value here is actually the relative offset value to the baseOffset, that is, the real offset value minus the value of baseOffset. In the next lesson, I will focus on this and explain it to you. For now, you only need to know that using the relative offset value can effectively save disk space. The broker-side parameter log.segment.bytes is an integer, which means that the size of each log segment file in Kafka will not exceed 2^32, which is 4GB. This means that the difference between the offset values on the same log segment file and baseOffset will always be within the range of integers. Therefore, the source code only needs to save it with 4 bytes.

Similarly, in TimeIndex, the timestamp type is a long integer, which takes up 8 bytes, and the offset is still represented by the relative offset value, taking up 4 bytes. Therefore, a total of 12 bytes are required.

If someone asks you about the underlying implementation of the index in Kafka, you can confidently tell them: memory-mapped files, specifically Java’s MappedByteBuffer.

The main advantage of using memory-mapped files is that they have high I/O performance, especially for small files like indexes. Since the file memory is directly mapped to a range of virtual memory, accessing memory-mapped files is faster than normal read-write operations.

In addition, in many operating systems (such as Linux), this mapped memory region is actually the kernel’s page cache. This means that the data inside does not need to be copied to user space repeatedly, avoiding unnecessary time and space consumption.

In AbstractIndex, this MappedByteBuffer is stored in a variable named mmap. Next, I will use comments to take you deep into the main process of mmap:

@volatile
protected var mmap: MappedByteBuffer = {
    // Step 1: Create the index file
    val newlyCreated = file.createNewFile()
    // Step 2: Open the index file in the mode specified by `writable` (read-write or read-only)
    val raf = if (writable) new RandomAccessFile(file, "rw") else new RandomAccessFile(file, "r")
  ...
}
try {
  if (newlyCreated) {
    if (maxIndexSize < entrySize) {
      // The predefined index file size cannot be too small. If it cannot even store one index entry, throw an exception directly.
      throw new IllegalArgumentException("Invalid max index size: " + maxIndexSize);
    }
    // Step 3: Set the length of the index file. roundDownToExactMultiple calculates the maximum integer multiple of entrySize that is not greater than maxIndexSize.
    // For example, if maxIndexSize = 1234567 and entrySize = 8, the adjusted file length will be 1234560.
    raf.setLength(roundDownToExactMultiple(maxIndexSize, entrySize));
  }

  // Step 4: Update the length field of the index.
  _length = raf.length();
  // Step 5: Create a MappedByteBuffer object.
  final MappedByteBuffer idx;
  if (writable) {
    idx = raf.getChannel().map(FileChannel.MapMode.READ_WRITE, 0, _length);
  } else {
    idx = raf.getChannel().map(FileChannel.MapMode.READ_ONLY, 0, _length);
  }
  /* set the position in the index for the next entry */
  // Step 6: If it is a newly created index file, set the current position of the MappedByteBuffer object to 0.
  // If the index file already exists, set the current position of the MappedByteBuffer object to the position where the last index entry is located.
  if(newlyCreated) {
    idx.position(0);
  } else {
    idx.position(roundDownToExactMultiple(idx.limit(), entrySize));
  }
  // Step 7: Return the created MappedByteBuffer object.
  return idx;
} finally {
  CoreUtils.swallow(raf.close(), AbstractIndex); // Close the opened index file handle.
}

The main purpose of this code is to create a mmap object. You should know that most of the operations in AbstractIndex are related to mmap.

Let me give you two simple examples.

Example 1: If we want to calculate how many index entries are currently in the index object, we only need to perform the following calculation:

protected var _entries: Int = mmap.position() / entrySize

Example 2: If we want to calculate the maximum number of index entries that the index file can hold, we only need to define the following variable:

private[this] var _maxEntries: Int = mmap.limit() / entrySize

Furthermore, with these two variables, we can easily write a method to determine whether the current index file is full:

def isFull: Boolean = _entries >= _maxEntries

In summary, the mmap variable is the most important in AbstractIndex. In fact, the main logic of adding index entries is implemented in the subclasses of AbstractIndex, i.e., inserting the corresponding fields into mmap.

Writing Index Entries #

The following code is the append method of OffsetIndex, which is used to write new index entries to the index file.

def append(offset: Long, position: Int): Unit = {
  inLock(lock) {
    // Step 1: Check if the index file is not full
    require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
    // Step 2: Only allow writing index entries if one of the following conditions is met:
    // Condition 1: The index file is empty
    // Condition 2: The offset to be written is greater than the offsets of all existing index entries
    //              in order to maintain monotonically increasing offsets in the index
    if (_entries == 0 || offset > _lastOffset) {
      trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
      mmap.putInt(relativeOffset(offset)) // Step 3A: Write the relative offset value to mmap
      mmap.putInt(position) // Step 3B: Write the physical position information to mmap
      // Step 4: Update other metadata statistics, such as the current index entry counter (_entries)
      //         and the latest offset value (_lastOffset)
      _entries += 1
      _lastOffset = offset
      // Step 5: Perform validation. The format of the written index entry must meet the requirements,
      //         which means that the product of the number of index entries and the number of bytes
      //         per entry should match the current physical size of the file. Otherwise, it indicates
      //         that the file is corrupted.
      require(_entries * entrySize == mmap.position(), entries + " entries but file position in index is " + mmap.position() + ".")
    } else {
      // If none of the conditions in Step 2 are met, an exception is thrown and the index entry is not written
      throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
        s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
    }
  }
}

I’ve summarized the execution flow of the append method using this diagram:

Searching for Index Entries #

The logic for writing index entries is not complicated, but the difficulty lies in searching for index entries. The AbstractIndex defines an abstract method called parseEntry for searching a given index entry. Here is its signature:

protected def parseEntry(buffer: ByteBuffer, n: Int): IndexEntry

Here, the parameter n represents the nth index entry to search for in the given ByteBuffer (also referred to as the nth slot in Kafka). IndexEntry is an interface defined in the source code and it contains two methods: indexKey and indexValue, which return the key and value of the index, respectively.

The OffsetIndex implementation of parseEntry is as follows:

override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {
  OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}

OffsetPosition is an implementation class of IndexEntry. The key mentioned earlier refers to the offset value, while the value refers to the physical disk position. Therefore, you can see that the code calls relativeOffset(buffer, n) + baseOffset to compute the absolute offset value, and then calls physical(buffer, n) to compute the physical disk position. Finally, these values are encapsulated together as an independent index entry and returned.

With the parseEntry method, we can search for index entries based on the given n. However, there is another problem that needs to be addressed: how do we determine which slot contains the index entry we want to find? Essentially, this is an algorithmic problem, which is how to quickly locate the desired number from an ordered set of numbers.

I recommend taking a look at the implementations of the relativeOffset and physical methods to see how they compute the relative offset value and the physical disk position information.

Binary Search Algorithm #

So far, the fastest algorithm for finding a certain number from a sorted array is binary search. It achieves a time complexity of O(logN). Kafka’s indexing component applies the binary search algorithm.

First, let me provide the original implementation code.

  private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
        // Step 1: If the current index is empty, return <-1,-1>
        if(_entries == 0)
          return (-1, -1)

        // Step 2: The target offset value to be searched should not be smaller than the current minimum offset value
        if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
          return (-1, 0)

        // binary search for the entry
        // Step 3: Perform binary search algorithm
        var lo = 0
        var hi = _entries - 1
        while(lo < hi) {
          val mid = ceil(hi/2.0 + lo/2.0).toInt
          val found = parseEntry(idx, mid)
          val compareResult = compareIndexEntry(found, target, searchEntity)
          if(compareResult > 0)
            hi = mid - 1
          else if(compareResult < 0)
            lo = mid
          else
            return (mid, mid)
        }

        (lo, if (lo == _entries - 1) -1 else lo + 1)

The core of this code lies in Step 3, the binary search algorithm. If you are familiar with binary search, you will not find this code unfamiliar.

At this point, everything seems perfect: Kafka indexes using binary search to quickly locate the position of the index item to be searched, and then calls parseEntry to read the index item. However, is this really an impeccable solution?

Improved Binary Search Algorithm #

Clearly not! As I mentioned before, most operating systems use page cache to implement memory mapping, and currently almost all operating systems use the LRU (Least Recently Used) or LRU-like mechanisms to manage the page cache.

Kafka writes to the index file by appending at the end of the file, and almost all index queries are concentrated at the end of the index. In this case, the LRU mechanism is very suitable for Kafka’s index access scenario.

However, there is a problem here. When Kafka queries the index, the original binary search algorithm does not consider the issue of caching, which may result in unnecessary page faults. At this time, the Kafka thread will be blocked, waiting for the corresponding index entry to be read from the physical disk and placed in the page cache.

Let me give an example to illustrate this situation. Suppose a Kafka index occupies 13 pages in the operating system’s page cache. If the offset value to be searched is on the last page, which is Page 12, the standard binary search algorithm will sequentially read pages 0, 6, 9, 11, and 12, as shown in the following diagram:

Usually, a page can hold hundreds or thousands of index entries. As the index file is continuously written to, Page #12 is filled with new index entries. If the index queries come from In-Sync Replica (ISR) replicas or consumers with a small lag, most of these queries will be focused on the query of Page #12. Therefore, Page #0, 6, 9, 11, and 12 must be frequently accessed by the code and are therefore likely to be saved in the page cache. Later, when Page #12 is filled with new index entries, the page cache will allocate a new page, which is Page #13, to save the index entries.

Now, the latest index entries are stored in Page #13. If you want to find the latest index entry, the original binary search algorithm will sequentially access pages 0, 7, 10, 12, and 13. Here comes the problem: Page 7 and 10 have not been accessed for a long time, so they are most likely not in the page cache. Therefore, once the index accesses Page #13, a page fault will occur, waiting for those cold page data to be loaded from the disk to the page cache. According to tests by foreign users, this loading process can take up to 1 second.

Obviously, this is a common problem, that is, whenever the number of pages occupied by the index file changes, the search path of the binary search must be forcibly changed, resulting in the need to load cold data that is not in the page cache into the page cache. This loading process is very time-consuming.

Based on this problem, the community has proposed an improved version of the binary search strategy, which is a cache-friendly search algorithm. The overall idea is to divide all index entries into two parts: the warm area and the cold area, and then perform binary search algorithms in these two areas separately, as shown in the following diagram:

At first glance, this algorithm does not have any significant improvements. It simply divides the search area into hot and cold areas, and then conditionally performs ordinary binary search algorithms in different areas. In fact, this improved algorithm provides an important guarantee: It guarantees that the combination of pages that need to be frequently accessed is fixed.

Think about the example I just mentioned. For querying the same hot data, once the index occupies more pages, the combination of pages to traverse will change. This is the main reason for the performance degradation.

The biggest advantage of this improved algorithm is that the pages that need to be traversed for querying the hot data are always fixed, so they are most likely in the page cache, avoiding meaningless page faults.

Now let’s look at the actual code. I explained the implementation logic of the improved algorithm using comments. Once you understand the principle of dividing the cold and hot areas, the rest is not difficult.

   private def indexSlotRangeFor(idx: ByteBuffer, target: Long, searchEntity: IndexSearchEntity): (Int, Int) = {
        // Step 1: If the index is empty, return <-1,-1> pair directly
        if(_entries == 0)
          return (-1, -1)


        // Encapsulate the original binary search algorithm
        def binarySearch(begin: Int, end: Int) : (Int, Int) = {
          // binary search for the entry
          var lo = begin
          var hi = end
          while(lo < hi) {
            val mid = (lo + hi + 1) >>> 1
            val found = parseEntry(idx, mid)
            val compareResult = compareIndexEntry(found, target, searchEntity)
            if(compareResult > 0)
              hi = mid - 1
            else if(compareResult < 0)
              lo = mid
            else
              return (mid, mid)
          }
          (lo, if (lo == _entries - 1) -1 else lo + 1)
        }


        // Step 3: Determine which slot the first hot entry is located in. 
        // _warmEntries is the so-called partition, currently fixed at 8192 bytes
        // If it is OffsetIndex, _warmEntries = 8192 / 8 = 1024, which is the 1024th slot
        // If it is TimeIndex, _warmEntries = 8192 / 12 = 682, which is the 682nd slot
        val firstHotEntry = Math.max(0, _entries - 1 - _warmEntries)
        // Step 4: Determine whether the target offset value is in the hot area or the cold area
        if(compareIndexEntry(parseEntry(idx, firstHotEntry), target, searchEntity) < 0) {
          return binarySearch(firstHotEntry, _entries - 1) // If it's in the hot area, search the hot area
        }


        // Step 5: Make sure the target offset value is not smaller than the current minimum offset value
        if(compareIndexEntry(parseEntry(idx, 0), target, searchEntity) > 0)
          return (-1, 0)


        // Step 6: If it's in the cold area, search the cold area
        binarySearch(0, firstHotEntry)

Summary #

Today, I took you through a detailed explanation of Kafka’s indexing mechanism, as well as how the community applies binary search algorithms to achieve fast indexing. There are two key points that you must remember.

  1. AbstractIndex: This is the abstract superclass for all types of indexes in Kafka. The mmap variable inside it is the core implementation of the indexing mechanism, and you must grasp it.
  2. Improved binary search algorithm: The community has made customized improvements to the standard binary search algorithm based on actual access scenarios. You need to pay special attention to the efforts made in improving cache performance. The improved version can effectively increase the usage of page cache, thereby reducing overall physical I/O and relieving system load bottlenecks. I suggest you consider the work done by the community in this aspect from the dimension of indexing.

In fact, whether it is AbstractIndex or the binary search algorithm it uses, they are all common features of Kafka indexes, meaning that all Kafka indexes have these characteristics. So, do you want to know the differences between different types of indexes? For example, what are the similarities and differences between the offset index and the timestamp index? I will reveal the answers to these questions in the next lesson, so be sure not to miss it.

After-class Discussion #

Currently, the division line between the cold zone and the hot zone is set at 8192 bytes. Please explain why it is set to 8192, based on the source code comments and your own understanding.

Feel free to discuss and exchange ideas in the comments section. You are also welcome to share today’s content with your friends.