05 on Indexes the Differences Between Offset Index and Timestamp Index

05 On Indexes The Differences between Offset Index and Timestamp Index #

Hello, I am Hu Xi. Today, let’s continue talking about indexing.

In the previous lesson, I focused on teaching you about the abstract parent class of indexing in Kafka’s source code, AbstractIndex. I analyzed the general object structure of the AbstractIndex class and introduced the community’s improved version of binary search algorithm applied to Kafka indexing.

As mentioned before, there are three types of indexes in Kafka: Offset Index, Timestamp Index, and Aborted Transaction Index. Compared to the last type of index, the first two types of indexes are more commonly seen. Under Kafka’s data path, you must have seen many files with the .index and .timeindex extensions. I wonder if you have ever had this question: “What are these files used for?” Now I can tell you clearly: the .index file is Kafka’s Offset Index file, while the .timeindex file is the Timestamp Index file.

So, what are the purposes of the Offset Index and Timestamp Index? What are the differences between them? Today, I will reveal the answers to these questions for you.

Offset Index #

When studying any type of index in Kafka, we need to pay attention to two questions:

  1. How are the entries in the index defined?
  2. How do we write new entries to the index?

You may be wondering, “Wait a minute, don’t we need to care about how to query the index?” Of course we do! In the previous lesson, didn’t we talk about the application of the binary search algorithm in the index? If you feel a bit rusty, you should review it quickly.

Now, let’s first look at the definition of the index entries.

Definition of Index Entries #

The offset index, also known as OffsetIndex, is a seasoned component. If I remember correctly, Kafka started to be widely used in China in the 0.8 era. Since then, OffsetIndex has been around. Whenever a consumer needs to read messages starting from a specific position in a topic partition, Kafka uses the OffsetIndex to directly locate the physical file position, thereby avoiding the expensive I/O operations caused by reading messages from the beginning.

In the previous lesson, I mentioned that different index types store different pairs. For the OffsetIndex, the key is the relative offset of the message, and the value is the physical file position of the first byte of that message in the log segment file.

Let me explain the meaning of the relative offset in detail. Do you remember the abstract method entrySize in the AbstractIndex class? It defines the number of bytes used by a single entry. For the OffsetIndex, the entrySize is 8, as defined in OffsetIndex.scala:

override def entrySize = 8

Why 8? The relative offset is an integer occupying 4 bytes, and the physical file position is also an integer, likewise occupying 4 bytes. Therefore, the total is 8 bytes.

What about the value of the relative offset? We know that the message offset value in Kafka is a long integer, which should occupy 8 bytes. However, Kafka has made some optimizations when storing the pairs in the OffsetIndex. Each OffsetIndex object already saves the starting offset of the corresponding log segment object when it is created. Therefore, the OffsetIndex entry does not need to store the complete 8-byte offset value. Instead, it only needs to store the difference (delta) from the starting offset, which can be accommodated by an integer. This design allows each OffsetIndex entry to save 4 bytes.

Let’s take a simple example. Suppose an index file has 1000 index entries, using the relative offset value can save about 4MB of space. Isn’t that a cost-effective approach?

OffsetIndex defines a dedicated method to convert a long offset value into a relative offset, as shown below:

def relativeOffset(offset: Long): Int = {
    val relativeOffset = toRelative(offset)
    if (relativeOffset.isEmpty)
      // If the conversion fails (e.g., the difference exceeds the integer range),
      // an exception is thrown
      throw new IndexOffsetOverflowException(s"Integer overflow for offset: $offset (${file.getAbsoluteFile})")
    relativeOffset.get
}

The relativeOffset method calls the toRelative method of the base class to perform the actual conversion. Let’s take a look at the implementation of the toRelative method.

private def toRelative(offset: Long): Option[Int] = {
  val relativeOffset = offset - baseOffset
  if (relativeOffset < 0 || relativeOffset > Int.MaxValue)
    None
  else
    Some(relativeOffset.toInt)
}

The logic is simple: the first step is to calculate the difference between the given offset value and the baseOffset; the second step is to check if this difference is negative or exceeds the integer range. If the check passes, the difference is directly returned as the relative offset value; otherwise, None is returned to indicate a failed conversion.

Now, do you understand why the index entries in the OffsetIndex are 8 bytes and how the offset value is converted to a relative offset?

When reading the OffsetIndex, the source code also needs to restore the relative offset value to the previous complete offset. This is implemented in the parseEntry method.

override protected def parseEntry(buffer: ByteBuffer, n: Int): OffsetPosition = {
  OffsetPosition(baseOffset + relativeOffset(buffer, n), physical(buffer, n))
}

Let me explain the specific implementation method.

This method returns an OffsetPosition type. This class has two methods, one returning the key of the index entry and the other returning the value.

The parseEntry method here is used to construct the key and value required by OffsetPosition. The key is the complete offset value in the index entry, and the code uses baseOffset + relativeOffset(buffer, n) to restore the relative offset value to the complete offset value; the value is the physical position of the message in the log segment file on this offset value, the code calls the physical method to calculate this physical position and uses it as the value. Finally, the parseEntry method encapsulates the Key and Value into an OffsetPosition instance and returns this instance.

Since the total number of bytes in the index file is the product of the byte size of each index item and the number of index items, the code can easily calculate the physical file position of the nth index item by combining entrySize and buffer.getInt methods. This is exactly what the physical method does.

Writing Index Items #

Now that we have this foundation, the following content becomes easy to understand. Let’s take a look at the implementation of the most important operation in OffsetIndex - the append method.

def append(offset: Long, position: Int): Unit = {
  inLock(lock) {
    // If the index file is already full, throw an exception
    require(!isFull, "Attempt to append to a full index (size = " + _entries + ").")
    // To maintain the monotonic increase property of the index, 
    // we need to ensure that the offset value to be written is greater than 
    // all existing offset values in the current index file
    if (_entries == 0 || offset > _lastOffset) {
      trace(s"Adding index entry $offset => $position to ${file.getAbsolutePath}")
      mmap.putInt(relativeOffset(offset)) // Write relative offset to mmap
      mmap.putInt(position) // Write physical file position to mmap
      _entries += 1 // Update the number of index items
      _lastOffset = offset // Update the largest offset value in the current index file
      // Ensure that the written index item format meets the requirements
      require(_entries * entrySize == mmap.position(), s"$entries entries but file position in index is ${mmap.position()}.")
    } else {
      throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to position $entries no larger than" +
        s" the last offset appended (${_lastOffset}) to ${file.getAbsolutePath}.")
    }
  }
}

The append method takes two parameters: a Long offset value and an Integer physical file position. The two most important steps of this method are to write the relative offset value and the physical file position to the mmap. I have summarized the execution flow of the append method with a diagram:

In addition to the append method, there is another common operation for the index: truncation. Truncation is the process of directly cutting off a part of the index file. For example, if the OffsetIndex index file currently contains 100 index items, and I want to keep only the first 40 index items. The source code defines the truncateToEntries method to implement this requirement:

private def truncateToEntries(entries: Int): Unit = {
  inLock(lock) {
    _entries = entries
    mmap.position(_entries * entrySize)
    _lastOffset = lastEntry.offset
    debug(s"Truncated index ${file.getAbsolutePath} to $entries entries;" +
      s" position is now ${mmap.position()} and last offset is now ${_lastOffset}")
  }
}

This method takes the entries parameter, which represents which slot to truncate to, and the main logical implementation is to call the mmap’s position method. The _entries * entrySize in the source code represents the bytes that mmap needs to truncate to.

Now, let’s talk about how to use OffsetIndex.

Since OffsetIndex is used to quickly locate the physical file position of a message, it definitely needs to define a method to execute the corresponding query logic. This method is the lookup method.

def lookup(targetOffset: Long): OffsetPosition = {
  maybeLock(lock) {
    val idx = mmap.duplicate // Duplicate the entire index mapping area using a private variable
    // The `largestLowerBoundSlotFor` method uses an improved binary search algorithm to find the corresponding slot
    val slot = largestLowerBoundSlotFor(idx, targetOffset, IndexSearchType.KEY)
    // If not found, return an empty position where the physical file position starts from 0, indicating reading from the beginning of the log file
    // Otherwise, return the index item corresponding to the `slot`
    if(slot == -1)
      OffsetPosition(baseOffset, 0)
    else
      parseEntry(idx, slot)
  }
}

I have added the main logic as comments in the code. This method returns the maximum offset value not greater than the given targetOffset and its corresponding physical file position. You can roughly understand this method as the FLOOR function for offset values.

Timestamp Index #

After discussing OffsetIndex, let’s take a look at another major type of index: the Timestamp Index, also known as TimeIndex. Similar to OffsetIndex, we will focus on the definition of index entries in TimeIndex and how to write them.

Definition of Index Entries #

Unlike OffsetIndex, TimeIndex stores pairs of <timestamp, relative offset>. The timestamp is stored as a long integer, while the relative offset is stored as an integer. Therefore, each individual index entry in TimeIndex occupies 12 bytes. This reveals an important fact: while saving the same number of index entries, TimeIndex occupies more disk space than OffsetIndex.

Writing Index Entries #

TimeIndex also has an append method, but it is called maybeAppend. Let’s take a look at its implementation logic.

def maybeAppend(timestamp: Long, offset: Long, skipFullCheck: Boolean = false): Unit = {
  inLock(lock) {
    if (!skipFullCheck)
      // Throw an exception if the index file is full
      require(!isFull, "Attempt to append to a full time index (size = " + _entries + ").")
    // Ensure the monotonic increase of offset
    if (_entries != 0 && offset < lastEntry.offset)
      throw new InvalidOffsetException(s"Attempt to append an offset ($offset) to slot ${_entries} no larger than" +
        s" the last offset appended (${lastEntry.offset}) to ${file.getAbsolutePath}.")
    // Ensure the monotonic increase of timestamp
    if (_entries != 0 && timestamp < lastEntry.timestamp)
      throw new IllegalStateException(s"Attempt to append a timestamp ($timestamp) to slot ${_entries} no larger" +
        s" than the last timestamp appended (${lastEntry.timestamp}) to ${file.getAbsolutePath}.")

    if (timestamp > lastEntry.timestamp) {
      trace(s"Adding index entry $timestamp => $offset to ${file.getAbsolutePath}.")
      mmap.putLong(timestamp) // Write the timestamp to mmap
      mmap.putInt(relativeOffset(offset)) // Write the relative offset to mmap
      _entries += 1 // Update the number of index entries
      _lastEntry = TimestampOffset(timestamp, offset) // Update the current latest index entry
      require(_entries * entrySize == mmap.position(), s"${_entries} entries but file position in index is ${mmap.position()}.")
    }
  }
}

Similar to OffsetIndex, the main logic of writing index entries in TimeIndex is to write the timestamp and relative offset to mmap. The difference is that TimeIndex, in addition to checking the monotonic increase of offset values, also ensures that the sequentially written timestamps are monotonically increasing.

Speaking of this, I remembered a thought I had when I read this piece of code years ago. At that time, this code did not have the validation logic for monotonically increasing timestamps. I had the idea to write an expired timestamp and offset to the TimeIndex index file. After some hands-on operation, I actually wrote an expired timestamp and offset to the TimeIndex index file.

Can you guess what happened next? It caused complete chaos in the consumer-side program. This is because when the consumer-side program filtered messages based on timestamp information, it read the expired timestamp and obtained the wrong offset value, resulting in the retrieval of incorrect data.

As a result, I submitted a Jira to the community, but it was rejected - the reason being that writing expired timestamps to TimeIndex is not allowed. I’m telling you this amusing story just to illustrate that sometimes, reading source code can trigger many inspirations or whimsical ideas that you wouldn’t think of in your usual usage of the code. This can also be considered a major benefit of reading source code.

Differences #

At this point, the lesson is coming to a close. Finally, I will use a table to summarize the characteristics and differences between OffsetIndex and TimeIndex. I hope this will help you better understand and digest the key points of today’s content.

Summary #

Today, I have analyzed the OffsetIndex and TimeIndex in detail, as well as their differences. Although OffsetIndex and TimeIndex are different types of indexes, Kafka combines them internally. The usual process is to first use the TimeIndex to find the message offset value that meets the timestamp requirement, and then use the OffsetIndex to locate the physical file location of that offset value. Therefore, they actually work together.

Finally, I would like to remind you: do not modify the index files in any way! I have encountered scenarios where users renamed index files on their own, resulting in Broker crashes and inability to start. In addition, although Kafka can rebuild indexes, deleting index files arbitrarily is still a very dangerous operation. In a production environment, I recommend that you try to avoid performing such operations.

After-class Discussion #

The lookup method in OffsetIndex implements a displacement lookup logic similar to the FLOOR function. Could you please write a displacement lookup logic similar to the CEILING function, which returns the minimum displacement value not less than the given target offset value targetOffset and its corresponding physical file position?

Feel free to leave your comments in the message area and discuss with me. You are also welcome to share today’s content with your friends.