14 Are Idempotent Producers and Transactional Producers the Same Thing

14 Are Idempotent Producers and Transactional Producers the Same Thing #

Hello, I’m Hu Xi. Today I want to share with you the topic of Kafka’s message delivery reliability and the implementation of exactly-once semantics.

The so-called message delivery reliability refers to the commitments that Kafka provides to the producer and consumer when handling messages. There are three common commitments:

  • At most once: Messages may be lost, but they will never be sent more than once.
  • At least once: Messages will not be lost, but they may be sent multiple times.
  • Exactly once: Messages will not be lost and will not be sent multiple times.

Currently, Kafka’s default delivery reliability is the second one, at least once. In Column 11, we mentioned the meaning of “message committed,” which means that only when the broker successfully “commits” the message and the producer receives an acknowledgment from the broker will it consider the message to be successfully sent. However, if the message is successfully “committed” but the broker’s acknowledgement is not successfully sent back to the producer (for example, due to a momentary network fluctuation), the producer cannot determine whether the message has really been successfully committed. Therefore, it can only choose to retry, which means sending the same message again. This is why Kafka defaults to at least once reliability, but it can result in duplicate message delivery.

Kafka can also provide at most once delivery guarantee by disabling retries for the producer. In this case, the message will either be written successfully or fail to be written, but it will never be sent more than once. We usually don’t want messages to be lost, but in some scenarios, occasional message loss is actually allowed, whereas message duplication is definitely to be avoided. In this case, using at most once delivery guarantee is the most appropriate.

Both at least once and at most once are not as attractive as exactly once. Most users still want messages to be delivered only once, so that the messages are neither lost nor duplicate. Or in other words, even if the producer sends the same message multiple times, the broker can automatically deduplicate it. From the perspective of downstream consumers, there is still only one message.

So, how does Kafka achieve exactly once semantics? In simple terms, this is done through two mechanisms: idempotence and transactions. What are these mechanisms? Are they the same thing? To answer these questions, let’s first talk about what idempotence is.

What is Idempotence? #

The term “idempotence” originally comes from the field of mathematics and refers to the concept that certain operations or functions can be performed multiple times, but the result remains the same each time. Let me give you a few simple examples to illustrate. In multiplication, multiplying a number by 1 is an idempotent operation because no matter how many times you perform this operation, the result is always the same. Similarly, the floor and ceiling functions are idempotent functions. So, running floor(3.4) once or 100 times will yield the same result, which is 3. On the other hand, adding 1 to a number is not an idempotent operation because the result will inevitably be different when executed once or multiple times.

In the field of computer science, the concept of idempotence has a slightly different meaning:

  • In imperative programming languages (such as C), if a subroutine is idempotent, it cannot modify the system state. Therefore, no matter how many times this subroutine is executed, the associated part of the system state remains unchanged.
  • In functional programming languages (such as Scala or Haskell), many pure functions are naturally idempotent as they do not perform any side effects.

Idempotence has many advantages. Its greatest advantage is that we can safely retry any idempotent operation without worrying about disrupting our system state. With non-idempotent operations, we need to consider the impact of multiple executions on the state. However, with idempotent operations, we don’t need to worry about such issues at all.

Idempotent Producer #

In Kafka, the Producer is not idempotent by default, but we can create an idempotent Producer. This feature was introduced in version 0.11.0.0. Prior to this version, when Kafka sent data to partitions, it was possible for the same message to be sent multiple times, resulting in message duplication. Starting from version 0.11, enabling idempotence for a Producer is very simple and only requires setting a parameter, either props.put("enable.idempotence", true) or props.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true).

Once enable.idempotence is set to true, the Producer automatically becomes an idempotent Producer, without any changes to the rest of the code logic. Kafka automatically deduplicates the messages for you. The underlying principle is a classic optimization technique known as trading space for time, where additional fields are stored on the Broker side. When a Producer sends messages with the same field values, the Broker recognizes that these messages are duplicates and silently “discards” them in the background. Of course, the actual implementation is not as simple as described, but you can understand it roughly in this way.

It seems that the idempotent Producer is cool and easy to use, requiring only one parameter to ensure message deduplication. However, it is essential to understand the scope of the idempotent Producer.

Firstly, it can only guarantee idempotence within a single partition, meaning an idempotent Producer can ensure that duplicate messages do not appear on a specific partition of a topic. It cannot achieve idempotence across multiple partitions. Secondly, it can only achieve idempotence within a single session and cannot guarantee idempotence across sessions. Here, a session can be understood as a run of the Producer process. Once you restart the Producer process, the guarantee of idempotence is lost.

Now, you may wonder how to achieve message deduplication across multiple partitions and sessions. The answer is transactions or relying on transactional Producers. This is the biggest difference between idempotent Producers and transactional Producers!

Transactions #

The concept of transactions in Kafka is similar to the transactions provided by databases that we are familiar with. In the field of databases, transactions provide the classic ACID guarantees, which stands for Atomicity, Consistency, Isolation, and Durability.

Of course, in practical scenarios, different databases have different implementations of ACID. In particular, ACID itself is an ambiguous concept, such as the understanding of isolation. In general, isolation is very natural and necessary, but when it comes to implementation details, it becomes less precise. Generally speaking, isolation means that concurrently executing transactions are isolated from each other and do not affect each other. Classic database textbooks refer to isolation as serializability, which means that each transaction pretends to be the only transaction in the entire database.

When it comes to isolation levels, this ambiguity or confusion becomes even more apparent. Many database vendors have their own different interpretations of isolation levels. For example, some databases provide the Snapshot isolation level, while in other databases, they are referred to as repeatable read. Fortunately, major mainstream database vendors have a relatively unified understanding of the read committed isolation level. The so-called read committed means that when reading from the database, you can only see committed data, i.e., no dirty reads. At the same time, when writing to the database, you can only overwrite committed data, i.e., no dirty writes.

Starting from version 0.11, Kafka also provides support for transactions, mainly focusing on the read committed isolation level. It ensures that multiple messages are atomically written to the target partition and also ensures that consumers can only see messages that have been successfully committed in the transaction. Now let’s take a look at the transactional Producer in Kafka.

Transactional Producer #

A transactional producer guarantees that messages are atomically written to multiple partitions. Either all messages are successfully written or none of them are. Furthermore, a transactional producer is not affected by process restarts. When a producer restarts, Kafka ensures that the messages it sends are exactly once processed.

Setting up a transactional producer is simple and requires two steps:

  • Enable enable.idempotence = true, just like with an idempotent producer.
  • Set the transactional.id parameter with a meaningful name.

In addition, you need to make some adjustments in your producer code, as shown in the example below:

producer.initTransactions();
try {
    producer.beginTransaction();
    producer.send(record1);
    producer.send(record2);
    producer.commitTransaction();
} catch (KafkaException e) {
    producer.abortTransaction();
}

Compared to a regular producer, the notable feature of a transactional producer is that it invokes some transactional APIs, such as initTransaction, beginTransaction, commitTransaction, and abortTransaction. These APIs respectively correspond to transaction initialization, transaction start, transaction commit, and transaction termination.

This code ensures that record1 and record2 are treated as a transaction and are either all successfully committed to Kafka or all failed to be written. In reality, even if the write fails, Kafka still writes the messages to the underlying log, meaning that consumers will still see these messages. Therefore, some changes are also required at the consumer side when reading messages sent by a transactional producer. Modifying the consumer code is straightforward; you just need to set the value of the isolation.level parameter. Currently, this parameter has two values:

  1. read_uncommitted: This is the default value, which means that consumers can read any messages written to Kafka, regardless of whether they were committed or aborted by the transactional producer. Obviously, if you use a transactional producer, the corresponding consumer should not use this value.
  2. read_committed: This value ensures that consumers only read messages that have been successfully committed by a transactional producer. However, it can still read all messages written by non-transactional producers.

Summary #

In summary, both idempotent producers and transactional producers are tools provided by the Kafka community to achieve exactly-once processing semantics in Kafka, but they have different scopes of operation. Idempotent producers can only guarantee message idempotence on a single partition and session, while transactions can guarantee idempotence across partitions and sessions. In terms of delivery semantics, transactional producers can do more.

However, it’s worth noting that there is no free lunch. Compared to idempotent producers, transactional producers have poorer performance. In actual usage, we need to carefully evaluate the cost of introducing transactions and not blindly enable transactions.

Open Discussion #

What do you understand about transactions? Based on today’s presentation, can you list any potential use cases of transactional Producers that could be applied in your company’s actual business?

Please write down your thoughts and answers. Let’s discuss together. If you find this helpful, feel free to share the article with your friends.