09 Producer Message Partitioning Mechanism Analysis

09 Producer Message Partitioning Mechanism Analysis #

When we use Apache Kafka to produce and consume messages, we definitely want data to be evenly distributed across all servers. For example, many companies use Kafka to collect log data from application servers. This type of data is often plentiful, especially in cluster environments consisting of a large number of machines. The amount of logs generated per minute can be in the range of gigabytes. Therefore, how to evenly distribute such a large amount of data across various brokers in Kafka becomes a very important issue.

Today, I will talk to you about how Kafka producers achieve this requirement. I will analyze it using the Java API as an example, but the implementation logic in other languages is similar.

Why do we need partitions? #

If you are not familiar with the concept of Kafka partitions, you can review it in Chapter 2 of this column. As mentioned earlier in the column, Kafka has the concept of topics, which are logical containers that carry real data. Under each topic, there are multiple partitions. So the message organization in Kafka is actually structured in three levels: topic-partition-message. Each message under a topic is only saved in one specific partition and not duplicated across multiple partitions. The following diagram from the official website clearly illustrates Kafka’s three-level structure:

Now, let me pose a question for you to think about: why does Kafka have this design? Why use partitions instead of multiple topics?

The purpose of partitions is to provide load balancing capability or, in other words, to partition the data for achieving high scalability of the system. Different partitions can be placed on different nodes or machines, and the read/write operations are performed at the partition level. This way, each machine on each node can independently handle read/write requests for its assigned partitions. Moreover, we can increase the overall throughput of the system by adding new nodes.

As a matter of fact, the concept of partitions and partitioned databases has been around since 1980. For example, Teradata, a database system from that era, introduced the concept of partitioning.

It is worth noting that different distributed systems have different terminologies for partitions. For example, in Kafka, they are called partitions, but in MongoDB and Elasticsearch, they are called shards. In HBase, they are called regions, and in Cassandra, they are called vnodes. Although their implementation details may differ on the surface, the overall concept of partitioning has remained unchanged.

In addition to providing the core functionality of load balancing, partitions can also be used to fulfill other business-level requirements. For example, they can be used to solve the problem of maintaining message order at a business level. I will share a specific case study on this topic today.

What are the partition strategies available? #

Now let’s talk about partition strategies for Kafka producers. The partition strategy determines the algorithm for producers to send messages to which partition. Kafka provides a default partition strategy, and it also supports custom partition strategies.

If you want to customize the partition strategy, you need to explicitly configure the parameter partitioner.class on the producer side. How do you set this parameter? The method is simple. When writing the producer program, you can write a concrete class that implements the org.apache.kafka.clients.producer.Partitioner interface. This interface is simple and only defines two methods: partition() and close(), where you usually only need to implement the most important partition method. Let’s take a look at the method signature of this method:

int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster);

Here, topic, key, keyBytes, value, and valueBytes are all message data, and cluster is cluster information (such as the number of topics and brokers in the current Kafka cluster). Kafka provides you with so much information to allow you to fully utilize this information to partition messages and determine which partition they should be sent to. As long as your implementation class defines the partition method and sets the partitioner.class parameter to the Full Qualified Name of your implementation class, the producer program will partition messages based on your code logic. While there can be countless possible partition strategies, there are several common ones that I will explain in detail below.

Round-robin Strategy

Also known as the round-robin strategy. For example, if a topic has 3 partitions, the first message is sent to partition 0, the second message is sent to partition 1, the third message is sent to partition 2, and so on. When producing the 4th message, it starts over and assigns it to partition 0 again. This is shown in the following diagram:

This is called the round-robin strategy. The round-robin strategy is the default partition strategy provided by the Kafka Java producer API. If you do not specify the partitioner.class parameter, your producer program will “place” messages in all partitions of the topic in a round-robin manner.

The round-robin strategy has excellent load balancing performance. It always ensures that messages are evenly distributed across all partitions. Therefore, it is the most reasonable partition strategy by default, and it is also one of the most commonly used partition strategies.

Random Strategy

Also known as the randomness strategy. With the random strategy, we randomly place messages on any partition, as shown in the following diagram:

To implement the random strategy version of the partition method, it is simple. You only need two lines of code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return ThreadLocalRandom.current().nextInt(partitions.size());

First, calculate the total number of partitions for the topic, and then randomly return a positive integer that is less than it.

Essentially, the random strategy also aims to evenly distribute data to various partitions, but in practical performance, it is inferior to the round-robin strategy. Therefore, if you pursue evenly distributed data, it is better to use the round-robin strategy. In fact, the random strategy was the partition strategy used in older versions of the producer but has been changed to round-robin in the newer version.

Key-Ordering Strategy

Also known as the key-ordering strategy. It’s a bit awkward because I made up this term myself, and it is not mentioned on the Kafka official website. Kafka allows you to define a message key for each message, abbreviated as Key. This Key plays a very important role. It can be a string with a clear business meaning, such as a customer code, department number, or business ID; or it can be used to represent message metadata. Especially in a time when Kafka did not support timestamps, in some scenarios, engineers would directly encapsulate the message creation time into the Key. Once a message is defined with a Key, you can ensure that all messages with the same Key enter the same partition. Since message processing within each partition is sequential, this strategy is called the Message Key Order Strategy, as shown in the following figure.

The partition method for implementing this strategy is also simple, requiring only the following two lines of code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return Math.abs(key.hashCode()) % partitions.size();

The default partition strategy of Kafka actually implements two strategies at the same time: if a Key is specified, the default implementation uses the Message Key Order Strategy; if no Key is specified, the round-robin strategy is used.

After you learn about Kafka’s default partition strategy, let me tell you a real case that will hopefully enhance your understanding of the importance of partition strategy.

I once conducted Kafka training for a state-owned enterprise, and one problem we encountered was how to achieve message sequencing. The messages sent by this company had a causal relationship, so ensuring sequencing was crucial for handling the causal relationship; otherwise, processing the “effect” before the “cause” would inevitably result in business confusion.

At the time, this company’s approach was to set a single partition for the Kafka topic, which is just one partition. This way, all messages were read and written only in this partition, ensuring global sequencing. Although this approach achieved sequential causality, it also lost the advantages of high throughput and load balancing brought by multiple partitions.

Later, after investigation and research, I found that these messages with a causal relationship had certain characteristics, such as a fixed flag embedded in the message body. So, I suggested that they set up a dedicated partition strategy for this flag to ensure that all messages with the same flag were sent to the same partition. This way, not only the message order within each partition could be guaranteed, but the performance benefits of multiple partitions could also be enjoyed.

This partition strategy based on individual fields is essentially the idea of message key ordering. A more appropriate approach is to extract the flag data and put it into the Key, which better aligns with Kafka’s design philosophy. After the transformation, the message processing throughput of this company increased by more than 40 times. From this case, you can also see the effect of custom partition strategies.

Other Partition Strategies

The partition strategies mentioned above are relatively basic strategies. Besides these, can you think of any partition strategies with practical uses? In fact, there is another relatively common one, called a geolocation-based partition strategy. Of course, this strategy is generally only applicable to large-scale Kafka clusters, especially clusters that span cities, countries, or even continents.

Let me give you an example using “Geektime.” Let’s assume that all of Geektime’s services are deployed in a data center in Beijing (here, I assume it is a self-built data center without considering public cloud solutions. Actually, even with public cloud, the implementation logic is similar). Now, Geektime plans to set up another data center in a southern city, such as Guangzhou, in addition to the Beijing data center. Furthermore, a portion of servers will be selected from both data centers to form a large Kafka cluster. Obviously, there will be a portion of servers in Beijing and another portion in Guangzhou in this cluster.

Let’s assume that Geektime plans to provide a registration gift to each new registered user. For example, when a user in the south registers on Geektime, they can get a free bowl of “sweet tofu pudding,” while a newly registered user in the north can get a bowl of “salty tofu pudding.” If we use Kafka to implement this, it’s simple. Just create a topic with two partitions and then create two consumer programs to process the registration logic for users in the north and south, respectively.

But the problem is that you need to correctly send the registration messages of users in the north and south to the respective data centers because the consumer program processing these messages can only be running in one data center. In other words, the consumer program that gives out sweet tofu pudding is only running in the Guangzhou data center, while the program that gives out salty tofu pudding is only in the Beijing data center. If you send the registration messages of users in the north to the broker in the Guangzhou data center, then these users won’t be able to get their gift!

At this time, we can implement a customized partition strategy based on the IP addresses of the brokers. For example, consider the following code:

List<PartitionInfo> partitions = cluster.partitionsForTopic(topic);
return partitions.stream().filter(p -> isSouth(p.leader().host())).map(PartitionInfo::partition).findAny().get();

We can find all the partitions where the leader replicas are in the south among all the partitions. Then, we randomly select one partition to send the message.

Summary #

Today, we discussed the mechanism of partitioning messages in Kafka producers and several common partitioning strategies. Remember, partitioning is crucial for load balancing and high throughput. Therefore, it is important to carefully consider the appropriate partitioning strategy on the producer side to avoid “skewed” message data, which can lead to certain partitions becoming performance bottlenecks. This can easily result in decreased performance in downstream data consumption.

Open Discussion #

Which message partitioning strategy do you use most frequently in your production environment? What challenges or pitfalls have you encountered in the process?

Feel free to share your thoughts and answers here, so we can discuss together. If you find it insightful, please feel free to share this article with your friends.