Event Patterns: Idempotent Consumer (or Inbox)
In the previous parts of the series (Part 1, Part 2) we discussed how to safely produce events and transfer them to consumers. The only missing step is how to properly handle these events, which may seem the easiest one, but that’s not true. The problem that was hidden before is how to avoid handling the same message more than one time.
The Problem
As it was already mentioned in the previous article, the first two steps can guarantee that a message will be delivered “at most once” but not “exactly once”. It means that the problem of handling duplicate messages lies with the consumer.
Sometimes there is nothing bad in replacing the same code twice or it’s possible to make the consumer’s logic idempotent, then duplicated messages won’t harm your application in any way. But sometimes it’s not possible to build such a kind of logic. Anyway, even having this restriction in mind, making a mistake at some moment is inevitable.
Why are messages duplicated at all? Let’s forget for a second about consumers and message brokers and look at the more simplified model.
The only way to be sure that the receiver really got your message is to get the acknowledgement back from him. If the acknowledgement is not received, the sender retries delivering his message. But the acknowledgement also can be lost due to network issues.
How to make sure it was delivered? Send an acknowledgement for acknowledgement? And how long should the sender wait for it? It can send the message for a second time and only then receive the acknowledgement for the first one. Thus, it’s almost impossible for the sender to guarantee the message delivery and avoid sending the same message more than once.
Solution
At first, the only way to detect duplicate messages is to generate a unique identifier for the event on the producer’s side. Only with these identifiers the consumer may know that it handles the same message for the second time, and not a different message with the same attributes.
To prevent handling the same message twice the common way is to store them in the specified table, usually named processed_messages or inbox. The uniqueness of events can be assured by the unique index on the event identifier. It may be problematic to generate really unique identifiers across all the producers, but in the single producer, it’s achievable without any problems. In this case, I would consider using the combination of the event identifier and the producer identifier, which may be just a context name, e.g. [11153, ‘orders’].
Events should be stored in the table in the early stage — even before the consumer sends acknowledgement to the broker. In this case, when the acknowledgement doesn’t reach the broker and it sends the message again, the consumer tries to store it again, which fails due to the unique constraint violation.
What happens if the consumer successfully returns the acknowledgement to the broker and only then fails? Well, it depends. Sometimes it’s fine to process the same event again. For such cases, it’s useful to have an additional field in the inbox table, which represents the state of the event. For example, the event that was just inserted into the table has the state ‘pending’. The consumer updates all the entities in the database and changes the event state to ‘processed’ in the same transaction. If this transaction fails to commit, the event remains in the ‘pending’ state. The consumer process can then try to reprocess all the pending events after restart or after some timeout. But this scenario covers only the simplest cases. Unfortunately, consumer’s code can work not only with the local database, but send something to third-party systems also. For such situations execution of the full process from the beginning may be unacceptable and will have to be handled individually.
In some articles, you can find another option — storing message identifiers with the entity that was created or updated by this message. It should work similarly — an attempt to update this entity or to create another one with the same message identifier will fail and the event won’t be processed again. As for me, I wouldn’t recommend this approach. At first, it mixes up two different data types: data stored for business logic goals and data stored for the system infrastructure level purposes. It may also hide additional pitfalls especially when consumer logic is complicated and doesn’t boil down to the simple database entity update.
Alternative
There is another approach that can be applied if you have a log-based message broker. Such brokers use consumer offsets to determine the position of the consumer in the event log. All the events before this position are considered as already handled, so next time the consumer requests new events it’ll receive them starting from its offset value. Still, the problem remains the same — the consumer can process events, but the offset update fails.
The solution is similar to the one we discussed earlier, but now the consumer doesn’t have to store all the messages it had processed, only the offset value is enough here. However, it raises a new task to think about: this offset still should be somehow transferred to the broker. For example, Kafka recommends considering Kafka Connect as an option [3].
Conclusion
Now, most of the popular and widely used message brokers offer some built-in features or at least plugins to achieve “exactly once” delivery guarantee:
Kafka with processing.guarantee=exactly_once and exactly_once_v2 for Kafka Stream config https://kafka.apache.org/documentation/#streamsconfigs_processing.guarantee
NATS with message deduplication for NATS JetStream https://docs.nats.io/using-nats/developer/develop_jetstream/model_deep_dive#exactly-once-delivery
RabbitMQ has a message deduplication plugin https://github.com/noxdafox/rabbitmq-message-deduplication
Apache Pulsar Transaction API https://streamnative.io/en/blog/release/2021-06-14-exactly-once-semantics-with-transactions-in-pulsar/
Unfortunately, almost all of them have some restrictions or can’t guarantee that consumers will receive the message only once. That’s why in my opinion it’s better to have such a mechanism that will allow you to control how your consumers handle incoming asynchronous events and don’t completely rely on the implementation of the message broker you use.