Kafka Exactly Once Semantics: 7 Critical Aspects

on Engineering • September 15th, 2017 • Write for Hevo

Apache Kafka Exactly Once semantics is something which was much talked about but never achieved. Recently, Neha Narkhede, CTO of Confluent wrote an article that introduced the holy grail of Apache Kafka Exactly Once semantics in Kafka’s 0.11 release.

Before, people believed that it is not mathematically possible in distributed systems. This announcement raised eyebrows in the community. At Hevo, Kafka is our infrastructure backbone. So, we were curious too about the Kafka Exactly Once semantics. Here’s a detailed analysis of what is happening behind the scenes and how you can take advantage of Kafka’s Exactly once semantics.

Table of Contents

Understanding the Need of Kafka Exactly Once Semantics

At least once semantics guarantee that every message writes will be persisted at least once, without any data loss. It is useful when tuned for reliability. While ensuring this, the producer retries and causes duplicate in the stream.

Kafka Exactly Once Semantics Example

The broker may crash between committing a message and sending an acknowledgment back to the producer. It causes the producer to retry, which results in the duplication of messages in the target stream. Introducing idempotent producer semantics made more sense. Even in the event of client retries or broker failures, each message persists exactly once without any duplicates or data loss. Also, Read the article Kafka to BigQuery

Kafka Exactly Once

FIG. 1. Problem with Kafka’s existing semantics: At Least Once

The Exactly Once Semantics allows a robust messaging system but not across multiple TopicPartitions. To ensure this, you need transactional guarantees – the ability to write to several TopicPartitions atomically. Atomicity is the ability to commit a bunch of messages across TopicPartitions as one unit, either all messages committees or none of them.

This Kafka Exactly Once Semantics Example clearly depicts how using these semantics can help build robust messaging systems, maintain atomicity and reduce data redundancy with ease.

We’ll see these features of Kafka version 0.11 in detail.

Hevo Data: Explore your Data Conveniently

Hevo Data is No-code Data Pipeline. It provides its users with a simple platform for integrating data for analysis. It supports pre-built integration from 100+ data sources. It provides you with a consistent and reliable solution for managing data in real-time, ensuring that you always have analysis-ready data in your desired destination. 

Get Started with Hevo for Free

Let’s discuss some unbeatable features of Hevo:

  • Completely Automated: Hevo platform can be set up in minutes and requires minimal maintenance.
  • Real-Time Data Transfer: Hevo provides real-time data migration, so you can have analysis-ready data always.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer. 
  • Fault-Tolerant: Hevo is capable of detecting anomalies in the incoming data and informs you instantly. All the affected rows are kept aside for correction so that it doesn’t hamper your workflow.
  • Live Support: Hevo team is available round the clock to extend exceptional support to you through chat, email, and support calls.
  • Schema Management: Hevo takes away the tedious task of schema management and automatically detects the schema of incoming data and maps it to the destination schema.
  • Live Monitoring: Hevo allows you to monitor the data flow, so you can check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

Idempotent Producer

Idempotency is the second name of Kafka Exactly Once Semantics. To stop processing a message multiple times, it must be persisted to Kafka topic only once. During initialization, a unique ID gets assigned to the producer, which is called producer ID or PID.

PID and a sequence number are bundled together with the message and sent to the broker. As the sequence number starts from zero and is monotonically increasing, a Broker will only accept the message if the sequence number of the message is exactly one greater than the last committed message from that PID/TopicPartition pair. When it is not the case, the producer resends the message.

Kafka Exactly Once

FIG. 2. The Idempotent Producer

The producer ignores the low sequence number, which results in a duplicate error. Some messages may get lost due to high sequence numbers leading to an out-of-sequence error.

When the producer restarts, a new PID gets assigned. So the idempotency is promised only for a single producer session. Even though the producer retries requests on failures, each message is persisted in the log exactly once. There can still be duplicates depending on the source where the producer is getting data. Kafka won’t take care of the duplicate data received by the producer. So, in some cases, you may require an additional de-duplication system.

Ensuring Atomic Transactions with Apache Kafka Exactly Once Semantics

The Idempotent producer ensures Exactly Once Semantics message delivery per partition. To do so in multiple partitions, Kafka guarantees atomic transactions, which powers the applications to produce multiple TopicPartitions atomically. All writes to these TopicPartitions will either succeed or fail as a single unit. The application must provide a unique id, TransactionalId, to the producer, which is stable across all sessions of the application. There is a one-one mapping between TransactionalId and PID. Kafka promises exactly one active producer with a given TransactionalId by baring off the old instance when new with the same TransactionalId is alive. Kafka also guarantees that the new instance is in the pure state by ensuring that all unfinished transactions have been completed (aborted/committed).

Here’s a code snippet that demonstrates the use of the new Producer Transactional APIs to send messages atomically to a set of topics.

<Code>
{
    producer.initTransactions();
    try{
     producer.beginTransaction();
        producer.send(record0);
        producer.send(record1);
        producer.sendOffsetsToTxn(…);
        producer.commitTransaction();
    } catch( ProducerFencedException e) {
        producer.close();
    } catch( KafkaException e ) {
        producer.abortTransaction();
    }
} 
</Code>

You can refer to this to deep dive into the working details of the new Producer APIs.

Key Exceptions of Kafka Producer APIs

The new exceptions in Producer APIs are listed below:

  • ProducerFencedException: This exception is thrown if there is another active producer with the same TID in the system.
  • OutOfOrderSequenceException: The producer will throw this exception if the broker detects a discrepancy in the data received. If it receives a greater SID than expected, it means that some messages have been lost whereas lower SID means that the message is a duplicate.

Incorporating Exactly Once Semantics in Kafka Using Java

The steps involved in this method are as follows:

  • Step 1: To work with the Transaction API, you’ll need Kafka’s Java client in your pom:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-clients</artifactId> <version>2.0.0</version> </dependency>
  • Step 2: For this particular instance, you’ll be consuming messages from an input topic, sentences. This will be followed by counting every word and then sending the individual word counts to an output topic. You can call it counts. You can move forward with the assumption that the sentences topic already has transactional data available.
  • Step 3: You can add a producer in Kafka as follows:
Properties producerProps = new Properties(); producerProps.put("bootstrap.servers", "localhost:9092");

You will also need to enable idempotence and mention a transactional.id with the help of the following snippet:

producerProps.put("enable.idempotence", "true"); producerProps.put("transactional.id", "prod-1"); KafkaProducer<String, String> producer = new KafkaProducer(producerProps);

Now that you’ve enabled idempotence, Kafka will use the transaction id as a part of its algorithm. This is done to deduplicate any messages that this producer might send, ensuring idempotency.

  • Step 4: In this step, you need to call initTransaction to prepare the producer to use transactions through the producer.initTransactions(); command. This connects the producer with the broker as one that can use transactions identified by its transactional.id and an epoch or sequence number. The broker will use these to write-ahead all actions to a transaction log. The broker will also remove any actions from the log that belong to the same producer with the same transactional.id but a different epoch. It does so by assuming them to be from defunct transactions.
  • Step 5: While consuming, you can generally read all the messages on a topic partition in order. You can also indicate with isolation.level that the system should be waiting to read transactional messages until the associated transaction has been carried out.
Properties consumerProps = new Properties(); consumerProps.put("bootstrap.servers", "localhost:9092"); consumerProps.put("group.id", "my-group-id"); consumerProps.put("enable.auto.commit", "false"); consumerProps.put("isolation.level", "read_committed"); KafkaConsumer<String, String> consumer = new KafkaConsumer<>(consumerProps); consumer.subscribe(singleton(“sentences”));
  • Step 6: You have configured both the producer and the consumer to write and read transactionally, so you can consume records from your input topic and count each word in each record now with the following snippet:
ConsumerRecords<String, String> records = consumer.poll(ofSeconds(60)); Map<String, Integer> wordCountMap = records.records(new TopicPartition("input", 0)) .stream() .flatMap(record -> Stream.of(record.value().split(" "))) .map(word -> Tuple.of(word, 1)) .collect(Collectors.toMap(tuple -> tuple.getKey(), t1 -> t1.getValue(), (v1, v2) -> v1 + v2));

Here the read_committed is used which means that no messages written to the input topic in the same transaction will be read by this consumer until they are all written.

  • Step 7: To send the counts as new messages in the same transaction, you’ll be calling beginTransaction as the producer.beginTransaction(); command. Next, you can write each one to your “counts” topic with the key and the count being the word and the value respectively.
wordCountMap.forEach((key,value) -> producer.send(new ProducerRecord<String,String>("counts",key,value.toString())));
  • Step 8: You need to commit to the offsets that you just finished consuming. With transactions, you can commit the offsets back to the input topic you read them from. These are sent with the producer’s transactions. In the following snippet, you can do all this in a single call but you’ll first need to calculate the offsets for each topic partition:
Map<TopicPartition, OffsetAndMetadata> offsetsToCommit = new HashMap<>(); for (TopicPartition partition : records.partitions()) { List<ConsumerRecord<String, String>> partitionedRecords = records.records(partition); long offset = partitionedRecords.get(partitionedRecords.size() - 1).offset(); offsetsToCommit.put(partition, new OffsetAndMetadata(offset + 1)); }

You can then send the calculated offsets to the transaction:

producer.sendOffsetsToTransaction(offsetsToCommit, "my-group-id");
  • Step 9: You can commit to the transaction as follows:
producer.commitTransaction();

You can also catch an exception as follows:

try { // ... read from input topic // ... transform // ... write to output topic producer.commitTransaction(); } catch ( Exception e ) { producer.abortTransaction(); }

The Kafka broker will abort if you don’t commit or abort before the broker-configured max.transaction.timeout.ms.

Apache Kafka Exactly Once’s Impact on Consumers

On the consumer side, the behavior of the consumer can be tuned by changing the isolation level. The isolation level is the setting that maintains the balance between performance and reliability, consistency and reproducibility in a massive concurrent transaction environment.

There are now two new isolation levels in Kafka consumers:    

  • Read_Committed: It reads both kinds of messages that are not part of a transaction and that are after the transaction committees. Read_committed consumer uses end offset of a partition, instead of client-side buffering. This offset is the first message in the partition belonging to an open transaction. It is also known as “Last Stable Offset” (LSO). A read_committed consumer will only read up to the LSO and filter out any transactional messages which have been aborted.
  • Read_Uncommitted: Read all messages in offset order without waiting for transactions to committees. This option is similar to the current semantics of a Kafka consumer.

Apache Kafka Connect Exactly Once’s Impact on Performance

Kafka promises certain performance improvement over the previous release which includes, up to +20% producer throughput, up to +50% consumer throughput, and up to -20% disk utilization. The significant decrease in disk utilization is due to the change in the message format.

Message Format Change

The size of the older message format is fixed at 34 bytes. The new message format needed the addition of PID, Epoch, and a Sequence number which also added a significant overhead in the message size with 53 bytes. So, new message formats were introduced as MessageSet and Message, which is shown below:

MessageSet =>
FirstOffset => int64
Length => int32
PartitionLeaderEpoch => int32
Magic => int8
CRC => int32
Attributes => int16
LastOffsetDelta => int32 {NEW}
FirstTimestamp => int64 {NEW}
MaxTimestamp => int64 {NEW}
PID => int64 {NEW}
ProducerEpoch => int16 {NEW}
FirstSequence => int32 {NEW}
Messages => [Message]
Message => {ALL FIELDS ARE NEW}
Length => varint
Attributes => int8
TimestampDelta => varint
OffsetDelta => varint
KeyLen => varint
Key => data
ValueLen => varint
Value => data
Headers => [Header] /* Note: The array uses a varint for the number of headers. */
Header => HeaderKey HeaderVal
HeaderKeyLen => varint
HeaderKey => string
HeaderValueLen => varint
HeaderValue => data

MessageSet contains a list of Messages. I’ll not go into much detail about each item in the new message format. But, the total message size decreases on sending messages in a batch. The offset and timestamp delta, taking less space, is part of each message, whereas the initial timestamp and offset are part of MessageSet. PID and epoch being the same for all messages in a batch is also a part of MessageSet. This elimination of redundancy decreases the overhead of the new format as the batch size increases.

For example (as taken from cwiki.apache.org ) Assume a fixed message size of 50 with 100-byte keys and close timestamps, the overhead increases by only 7 bytes for each additional batched msgs (2 bytes for the message size, 1 byte for attributes, 2 bytes for timestamp delta, 1 byte for offset delta, and 1 byte for key size)

Batch SizeOld Format OverheadNew Format Overhead
134*1 = 3453 + 1*7 = 60
334*3 = 10253 + 3*7 = 74
1034*10 = 34053 + 10*7 = 123
5034*50 = 170053 + 50*7 = 403

Improved Error Handling

Apache Kafka’s Exactly Once Semantics provides users with robust recovery and an error-handling mechanism that allows both the producer and consumer to understand and determine if they can continue sending data in a way that doesn’t contradict the Exactly Once mechanism. It helps overcome the dreading issue of a fatal error that would result in the client having to create a new producer from scratch again.

For example, often a fatal error can arise when the producer fails to assign the sequence IDs or numbers to the records, thereby preventing the broker from accepting these records, which can only take place sequentially.

To overcome this issue, which would otherwise result in the producer making multiple re-attempts until it reaches the delivery timeout or succeeds in overcoming the errors, EOS provides an effective mechanism. It allows the broken to initialize the producer Id again by giving the InitProducerId parameter as well. The “broker” can now compare the producerId with the transactionalId to understand if the producer can continue to process data. In case, these match, the broken will start incrementing the epoch for the producer and send it back. This allows the producer to continue processing data and overcome the error resetting the sequence.

Kafka Exactly Once

This is how Kafka Exactly Once Semantics support and provides a robust error-handling mechanism.

Conclusion

Kafka Exactly Once semantics is a huge improvement over the previously weakest link in Kafka’s API: the Producer. However, it’s important to note that this can only provide you with Kafka Exactly Once semantics are provided that it stores the state/result/output of your consumer (as is the case with Kafka Streams).

Once you have a consumer that, for example, makes non-idempotent updates to a database, there’s the potential for duplication: if the consumer exists after updating the database, but before committing the Kafka offsets. Alternatively, transactions on the database lead to “message loss” and the application exits after the offsets were committed, but before the database transaction was committed.

Hevo is a No-code Data Pipeline. Hevo is building the most robust and comprehensive ETL solution in the industry. You can integrate with your in-house databases, cloud apps, flat files, clickstream, etc. at a reasonable price.

Visit our Website to Explore Hevo

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Share your experience of using Kafka exactly once in the comment section below.

No-code Data Pipeline for your Data Warehouse