How Kafka achieves exactly-once semantics

Oleg Potapov
7 min readJan 31, 2023

--

Exactly-once delivery guarantee is one of the most controversial topics if we talk about messaging and thus one of the most complex task to achieve. However, several years ago the Kafka team announced that they did, so let’s dive into their implementation to see how they made it and what limitations it has.

First, it’s worth defining what all these delivery semantics are. There are three of them used:

  • at least once — system guarantees that the message is received, but doesn’t guarantee that it happens only once
  • at most once — system does not guarantee that the message is received, but if it is, it happens only once
  • exactly once — it’s a combination of the two previous guarantees — the message is received and it happens only once

Of course, “exactly once” is the most wanted, but the hardest to achieve at the same time and possible to implement only if the producer, the broker and the consumer work together. This concept was explained in my previous article.

Kafka Streams

One very important and often missed detail is that Kafka supports exactly-once delivery only in Kafka Streams. To turn it on just change a config option processing.guarantee from at_least_once(default option) to exactly_once_v2.

But even Streams applications have limitations. If your consumer reads events from Kafka and makes changes in the relational database, Kafka won’t revert it. And if your consumer sends SMS-notifications Kafka can’t revert them either, even when Kafka Streams library is used. These are limitations the developer should always keep in mind.

Why do we talk about “reverting” changes? It’s because the only way to handle the message exactly once is to do it in one transaction.

So, what is Kafka Streams and why is it possible to make it transactional? Kafka Streams is a client library for building applications and services, where the input and output data are stored in Kafka clusters [1]. And that is the key.

Kafka Streams Application loop

Kafka Streams application implements read-process-write loop with the following steps:

  1. Reads messages from the input topic
  2. Invokes processing function for received messages, updates the internal state
  3. Generates output messages and sends them to the output topic (or topics)
  4. Waits for acknowledgement from Kafka for the output messages
  5. Commits the offset to the input topic, which means that the messages were processed successfully

As you may know, all the offsets are stored in the hidden Kafka topic in the same cluster. The internal state of the Streams application is also stored in the internal Kafka storage called state store. Thus all the changes are stored in the same Kafka cluster and can be managed and reverted inside a single transaction.

Streams Application data store

State store here is a bit oversimplified, in reality it’s more complex as it contains changelog topics and RocksDB instances but we can omit those details for now. More details about this internal storage can be found in the Kafka wiki [5].

Idempotent Producer

Let’s return to the beginning. The first stage where everything may go wrong is message producing. The producer sends a message to the broker and receives an acknowledgement, which means that the broker received it successfully. If there is no acknowledgement received, the producer sends the same message one more time.

Kafka producer failed to receive an acknowledgement

Above you can see three cases, when the producer doesn’t receive the acknowledgement from the broker and decides to send the message again:

  1. The broker didn’t receive the message, so obviously there is no ack
  2. The broker received the message, but sending an ack failed
  3. The broker received the message and also successfully sent the ack, but it took more than the producer’s waiting timeout

The producer will retry in all the cases, but in two of them(2 and 3) it will lead to a duplicate.

Nor I, nor probably Kafka developers know the way to solve this problem on the producer’s side. Thus all the work for deduplication lies on the broker, who guarantees that the message will be written to the log only once. To achieve this, there is a sequence number assigned to messages (I described a similar approach in the article about the Idempotent Consumer pattern [6]). So, to be exact, it’s not the idempotent producer, but the smart broker, that deduplicates messages.

To enable this functionality in Kafka it’s enough to configure the producer with the enable.idempotence=true option.

How Kafka transactions work

After the message is written to the Kafka log and the broker guarantees that it was done without duplicates, it should be just handled and written to the next topic in one transaction. But how to do it?

A Kafka transaction is a set of changes written in the log, which itself is stored in the internal Kafka topic. This log is managed by a special entity called Transaction Coordinator. In order to invoke a transaction several steps should be completed:

  1. The consumer finds the Transaction Coordinator. This happens when the application starts. It sends its configured transactionalID (if it exists) to the coordinator and receives the producerID. This is needed in case the application restarts and tries to register itself again with the same transactionalID. When the restarted application starts a new transaction, the Transaction Coordinator aborts all the pending transactions started by the previous instance.
  2. When the application consumes new messages it starts the transaction
  3. When the application writes messages to any other topics it sends this information to its Transaction Coordinator. The coordinator stores information about all the changed partitions in its internal topic.

This is an important detail. Using Kafka Streams API you don’t have to send these messages to the coordinator manually, Streams library will do it for you. But if you write messages to the topic directly, it won’t be written into the transaction log even if this topic is in the same cluster.

Another important thing about transactions is that all the messages written during the transaction will not be exposed to the consumers until this transaction is committed.

4. The transaction commits or fails. If it’s aborted, the coordinator adds an “Abort” mark to the transaction in the internal topic and the same mark to all the messages written during the transaction.

5. When the transaction commits, the process is almost the same. The coordinator adds a “Commit” mark to the transaction and to all the messages. That mark will make these messages available for the consumers.

Don’t you forget that consumer offsets are also stored in their own topic? It means that committing offsets is the same as writing a message to the output topic. And this message can also be marked “Abort” or “Commit” which affects whether the same message will be consumed the second time or not. Obviously, when it’s marked as “Commit”, it will not, and when it’s marked as “Abort” the whole transaction will start from the beginning — consuming messages.

Is the Transaction Coordinator also transactional?

I tried not to overload the post with the details to keep it as simple and clear as possible. Still one detail is worth mentioning. How does the Transaction Coordinator commit the transaction exactly? It should update the transaction, message offsets and output messages with the “Commit” mark. But what if something goes wrong in the middle of the process? Of course, Kafka won’t leave half of the messages in the committed and the other half in the pending state.

To make the commit change consistent, the Transaction Coordinator performs it in several steps:

  1. The coordinator receives a request to commit the transaction
  2. It writes PREPARE_COMMIT message to the transaction log
  3. It writes “Commit” marks to all the required logs
  4. When all the messages are marked as “Commit”, the coordinator writes a COMMITTED message to the transaction log which means the end of the transaction

When PREPARE_COMMIT is written to the transaction log, it means that the transaction will be fully committed in any case, sooner or later. But while COMMITTED is not written, the application can’t initiate the next transaction. It allows the coordinator to finish the commit process even if the whole Kafka service restarted.

The same logic applies to the process of reverting the aborted transaction.

Conclusion

You can see that there is a lot of complex logic and work behind the implementation of the “exactly once” guarantee in Kafka. And still this implementation has limitations. Knowing and keeping in mind these limitations is one of the key aspects of using such powerful tools as Kafka in your projects. In any case the choice to use such built-in features or to build something custom is always on you.

Links

  1. https://kafka.apache.org/documentation/streams/
  2. https://www.confluent.io/blog/exactly-once-semantics-are-possible-heres-how-apache-kafka-does-it/
  3. https://www.confluent.io/blog/transactions-apache-kafka/
  4. https://www.confluent.io/blog/enabling-exactly-once-kafka-streams/
  5. https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Internal+Data+Management
  6. https://oleg0potapov.medium.com/event-patterns-idempotent-consumer-or-inbox-b2812bf6656a

--

--

Oleg Potapov

Backend developer, interested in Ruby, Elixir, Postgres, Domain-Driven Design and Distributed Systems