Kafka Exactly Once Semantics: A Comprehensive Guide

on Engineering • September 15th, 2017 • Write for Hevo

Kafka’s Exactly Once semantics is something which was much talked about but never achieved. Recently, Neha Narkhede, CTO of Confluent wrote an article which introduced the holy grail of Kafka’s Exactly once semantics in Kafka’s 0.11 release.

Before, people believed that it is not mathematically possible in distributed systems. This announcement raised eyebrows in the community. At Hevo, Kafka is our infrastructure backbone. So, we were curious too about the Kafka Exactly once semantics. Here’s a detailed analysis of what is happening behind the scenes and how you can take advantage of Kafka’s Exactly once semantics.

Let’s see how this blog is structured for you:

  1. What is the Need of Kafka’s Exactly Once Semantics?
  2. Idempotent Producer
  3. Atomic Transactions
  4. Exceptions in Producer
  5. Consumers
  6. Performance Impact
  7. Message Format Change
  8. Conclusion

What is the Need of Kafka’s Exactly Once Semantics?

At least once semantics guarantee that every message writes will be persisted at least once, without any data loss. It is useful when tuned for reliability. While ensuring this, producer retries and causes duplicate in the stream.

For example, the broker may crash between committing a message and sending an acknowledgement back to the producer. It causes the producer to retry, which results in duplication of messages in the target stream. Introducing idempotent producer semantics made more sense. Even in the event of client retries or broker failures, each message persists exactly once without any duplicates or data-loss.

Kafka Exactly Once Semantics: Favorable Case and Gone Case

FIG. 1. Problem with Kafka’s existing semantics: At Least Once

This semantics allows robust messaging system but not across multiple TopicPartitions. To ensure this, you need transactional guarantees – the ability to write to several TopicPartitions atomically. Atomicity is the ability to commit a bunch of messages across TopicPartitions as one unit, either all messages committees or none of them.

We’ll see these features of Kafka version 0.11 in detail.

Hevo Data: Explore your Data Conveniently

Hevo Data is No-code Data Pipeline. It provides its users with a simple platform for integrating data for analysis. It supports pre-built integration from 100+ data sources. It provides you with a consistent and reliable solution for managing data in real-time, ensuring that you always have analysis-ready data in your desired destination. 

Let’s discuss some unbeatable features of Hevo:

  • Completely Automated: Hevo platform can be set up in minutes and requires minimal maintenance.
  • Real-Time Data Transfer: Hevo provides real-time data migration, so you can have analysis-ready data always.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer. 
  • Fault-Tolerant: Hevo is capable of detecting anomalies in the incoming data and informs you instantly. All the affected rows are kept aside for correction so that it doesn’t hamper your workflow.
  • Live Support: Hevo team is available round the clock to extend exceptional support to you through chat, email, and support call.
  • Schema Management: Hevo takes away the tedious task of schema management and automatically detects the schema of incoming data and maps it to the destination schema.
  • Live Monitoring: Hevo allows you to monitor the data flow, so you can check where your data is at a particular point in time.

Are you ready to try Hevo? If yes, then give it a try by signing up for a 14-day free trial today.

Idempotent Producer

Idempotency is the second name of Kafka Exactly once. To stop processing a message multiple times, it must be persisted to Kafka topic only once. During initialisation, unique ID gets assigned to the producer, which is called producer ID or PID.

PID and a sequence number is bundled together with the message and sent to the broker. As sequence number starts from zero and is monotonically increasing, a Broker will only accept the message if the sequence number of the message is exactly one greater than the last committed message from that PID/TopicPartition pair. When it is not the case, the producer resends the message.

Kafka Exactly Once semantics - Idempotent Producer

FIG. 2. The Idempotent Producer

Producer ignores the low sequence number, which results in a duplicate error. Some messages may get lost due to high sequence number leading to an out-of-sequence error.

When producer restarts, new PID gets assigned. So the idempotency is promised only for a single producer session. Even though producer retries requests on failures, each message is persisted in the log exactly once. There can still be duplicates depending on the source where the producer is getting data. Kafka won’t take care of the duplicate data received by the producer. So, in some cases, you may require an additional de-duplication system.

Atomic Transactions

The Idempotent producer ensures exactly-once message delivery per partition. To do so in multiple partitions, Kafka guarantees atomic transactions, which powers the applications to produce multiple TopicPartitions atomically. All writes to these TopicPartitions will either succeed or fail as a single unit. The application must provide a unique id, TransactionalId, to the producer, which is stable across all sessions of the application. There is a one-one mapping between TransactionalId and PID. Kafka promises exactly one active producer with a given TransactionalId by baring off the old instance when new with same TransactionalId is alive. Kafka also guarantees that the new instance is in the pure state by ensuring that all unfinished transactions have been completed (aborted/committed).

Here’s a code snippet which demonstrates the use of the new Producer Transactional APIs to send messages atomically to a set of topics.

<Code>
{
    producer.initTransactions();
    try{
     producer.beginTransaction();
        producer.send(record0);
        producer.send(record1);
        producer.sendOffsetsToTxn(…);
        producer.commitTransaction();
    } catch( ProducerFencedException e) {
        producer.close();
    } catch( KafkaException e ) {
        producer.abortTransaction();
    }
} 
</Code>

You can refer this to deep dive into the working details of the new Producer APIs.

Exceptions in Producer

The new exceptions in Producer APIs are listed below:

  1. ProducerFencedException: This exception is thrown if there is another active producer with the same TID in the system.
  2. OutOfOrderSequenceException: The producer will throw this exception if the broker detects a discrepancy in the data received. If it receives a greater SID than expected, it means that some messages have been lost whereas lower SID means that the message is a duplicate.

Consumers

On the consumer side, the behaviour of the consumer can be tuned by changing the isolation level. The isolation level is the setting that maintains the balance between performance and reliability, consistency and reproducibility in a massive concurrent transaction environment.

There are now two new isolation levels in Kafka consumer:    

  1. read_committed: It read both kind of messages that are not part of a transaction and that are after the transaction committees. Read_committed consumer uses end offset of a partition, instead of client-side buffering. This offset is the first message in the partition belonging to an open transaction. It is also known as “Last Stable Offset” (LSO). A read_committed consumer will only read up till the LSO and filter out any transactional messages which have been aborted.
  2. read_uncommitted: Read all messages in offset order without waiting for transactions to committees. This option is similar to the current semantics of a Kafka consumer.

Performance Impact

Kafka promises certain performance improvement over the previous release which includes, up to +20% producer throughput, up to +50% consumer throughput, and up to -20% disk utilization.  The significant decrease in the disk utilization is due to the change in the message format.

Message Format Change

The size of the older message format is fixed at 34 bytes. The new message format needed an addition of PID, Epoch and Sequence number which also added a significant overhead in the message size with 53 bytes. So, new message formats were introduced as MessageSet and Message, which is shown below:

MessageSet =>
FirstOffset => int64
Length => int32
PartitionLeaderEpoch => int32
Magic => int8
CRC => int32
Attributes => int16
LastOffsetDelta => int32 {NEW}
FirstTimestamp => int64 {NEW}
MaxTimestamp => int64 {NEW}
PID => int64 {NEW}
ProducerEpoch => int16 {NEW}
FirstSequence => int32 {NEW}
Messages => [Message]
Message => {ALL FIELDS ARE NEW}
Length => varint
Attributes => int8
TimestampDelta => varint
OffsetDelta => varint
KeyLen => varint
Key => data
ValueLen => varint
Value => data
Headers => [Header] /* Note: The array uses a varint for the number of headers. */
Header => HeaderKey HeaderVal
HeaderKeyLen => varint
HeaderKey => string
HeaderValueLen => varint
HeaderValue => data

MessageSet contains a list of Messages. I’ll not go into much details of each item in the new message format. But, the total message size decreases on sending messages in a batch. The offset and timestamp delta, taking less space, is part of each message, whereas the initial timestamp and offset is the part of MessageSet. PID and epoch being same for all message in a batch is also a part of MessageSet. This elimination of the redundancy decreases the overhead of the new format as the batch size increases.

For e.g.(as taken from cwiki.apache.org ) Assume a fixed message size of 50 with 100 byte keys and close timestamps, the overhead increases by only 7 bytes for each additional batched msgs (2 bytes for the message size, 1 byte for attributes, 2 bytes for timestamp delta, 1 byte for offset delta, and 1 byte for key size)

Batch SizeOld Format OverheadNew Format Overhead
134*1 = 3453 + 1*7 = 60
334*3 = 10253 + 3*7 = 74
1034*10 = 34053 + 10*7 = 123
5034*50 = 170053 + 50*7 = 403

Conclusion

Kafka Exactly Once semantics is a huge improvement over the previously weakest link in Kafka’s API: the Producer. However, it’s important to note that this can only provide you with Kafka Exactly Once semantics provided that it stores the state/result/output of your consumer (as is the case with Kafka Streams).

Once you have a consumer that, for example, makes non-idempotent updates to a database, there’s the potential for duplication: if the consumer exists after updating the database, but before committing the Kafka offsets. Alternatively, transactions on the database lead to “message loss” and the application exits after the offsets were committed, but before the database transaction was committed.

Hevo is a No-code Data Pipeline. Hevo is building the most robust and comprehensive ETL solution in the industry. You can integrate with your in-house databases, cloud apps, flat files, clickstream, etc. at a reasonable price.

Give Hevo a try by signing up for a 14-day free trial today.

Share your experience of using Kafka exactly once in the comment section below.

No-code Data Pipeline for your Data Warehouse