Many Fortune 100 brands such as Twitter, LinkedIn, Airbnb, and several others have been using Apache Kafka for multiple projects and communications. This sudden credibility shift to Kafka sure makes one question the reason for this growth.

Developed in 2010, Kafka was rendered by a LinkedIn team, originally to solve latency issues for the website and its infrastructure. It proved to be a credible solution for offline systems and had an effective use for the problem at hand. In 2011, Kafka was used as an Enterprise Messaging Solution for fetching data reliably and moving it in real-time in a batch-based approach.

Since then Kafka Streams have been used increasingly, each passing day to follow a robust mechanism for data relay. Read along to know more about Apache Kafka and access a detailed guide for working with Kafka Streams.

What is Kafka Streams?

Kafka Streams: Kafka Streams logo | Hevo Data
Image Source

Kafka Streams is a super robust world-class horizontally scalable messaging system. In other words, Kafka Streams is an easy data processing and transformation library within Kafka. You can use Kafka streaming to build real-time applications and microservices that react to data events and perform complex analytics.

Kafka Streams offer a framework and clutter-free mechanism for building streaming services. You can club it up with your application code, and you’re good to go!

Here is what Kafka brings to the table to resolve targeted streaming issues:

  • Kafka Streams app imparts a uniform processing load, as a new iteration of the running components adds with each new instance. This instance can be recreated easily even when moved elsewhere, thus, making processing uniform and faster.
  • While a certain local state might persist on disk, any number of instances of the same can be created using Kafka to maintain a balance of processing load. 
  • Kafka Streams leave you windowing with out-of-order data using a DataFlow-like model.
  • Kafka combines the concept of streams and tables to simplify the processing mechanism further. 

Why are Kafka Streams Needed?

Ideally, Stream Processing platforms are required to provide integration with Data Storage platforms, both for stream persistence, and static table/data stream joins.

However, the fault-tolerance and scalability factors are staunchly limited in most frameworks. The deployment, configuration, and network specifics can not be controlled completely. Kafka streams vs kafka is not a mutually exclusive choice, as kafka streams is built on top of kafka clients and relies on kafka brokers for data processing and storage.

You can overcome the challenges of Stream Processing by using Kafka Streams which offer more robust options to accommodate these requirements. Kafka Streams come with the below-mentioned advantages.

  • Kafka Streams comes with a fault-tolerant cluster architecture that is highly scalable, making it suitable for handling hundreds of thousands of messages every second.
  • Kafka Streams can be connected to Kafka directly and is also readily deployable on the cloud.
  • Kafka Streams can be accessed on Linux, Mac, and Windows Operating Systems, and by writing standard Java or Scala scripts.
  • Kafka Streams handles sensitive data in a very secure and trusted way as it is fully integrated with Kafka Security.
What makes Hevo’s Data Replication Experience Best in Class?

Replicating data can be a tiresome task without the right set of tools. Hevo’s Data Replication & Integration platform empowers you with everything you need to have a smooth Data Collection, Processing, and Replication experience.

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs. With integration with 150+ Data Sources (40+ free sources), we help you not only export data from sources & load data to the destinations but also transform & enrich your data, & make it analysis-ready.

Start for free now!

Get Started with Hevo for Free

Kafka Streams Architecture

Here is the anatomy of an application that leverages the Streams API. This provides a logical view of Kafka Streams application that can contain multiple stream threads, which can in turn contain multiple stream tasks.

Kafka Streams: Kafka Streams Architecture | Hevo Data
Image Source

A Processor topology (or topology in simple terms) is used to define the Stream Processing Computational logic for your application. It refers to the way in which input data is transformed to output data.

A topology is a graph of nodes or stream processors that are connected by edges (streams) or shared state stores. In this topology, you can access the following two special processors:

  • Sink Processor: A Sink Processor is a kind of stream processor that does not include downstream processors. Sink Processors send any received records from their upstream processors to a particular Kafka topic.
  • Source Processor: A Source Processor is a kind of stream processor that does not leverage any upstream processors. Instead, it produces an input stream to its topology from one or multiple Kafka topics by extracting records from these topics and forwarding them to its downstream processors.

A Stream Processing Application can be used to define one or more such topologies, although it is generally used to define one specific topology. Developers can define topologies either through the low-level processor API or through the Kafka Streams DSL, which incrementally builds on top of the former.

Therefore, you can define processor topology as a logical abstraction for your Stream Processing code. The logical topology gets instantiated at runtime and is replicated within the application for parallel processing.

Kafka Streams: Processor Topology | Hevo Data
Image Source

Key Concepts of Kafka Streams

Kafka Streams is a popular client library used for processing and analyzing data present in Kafka. To understand it clearly, check out its following core stream processing concepts:

Time

An important principle of stream processing is the concept of time and how it is modeled and integrated. You can classify Kafka Streams for the following time terms:

  • Event Time: This is the time when an event or record occurred and was then recorded “at the source”. For instance, you can consider the event time related to a GPS sensor of a car detecting a change in the location.
  • Processing Time: This is the time when a recorded event is processed(consumed) by a stream processing application. This time can range from milliseconds to hours to days, thereby being often slower than the original event time. For instance, the processing time of the analysis application that reads and processes geolocation data recorded by car sensors.
  • Ingest Time: This time represents the instance when an event is stored in the topic partition by the Kafka broker. The time of the event ingestion is the one when the dataset is added to the target topic by the Kafka broker & not when the dataset is created “at source”. It is also different from the processing time as an event may be stored in the topic portion but is not processed.

SerDes

Providing SerDes (Serializer / Deserializer) for the data type of the record key and record value (eg java.lang.String) is essential for each Kafka Streams application to materialize the data as needed. SerDes information is important for operations such as stream (), table (), to (), through (), groupByKey (), and groupBy (). Kafka Streams allows you to deploy SerDes using any of the following methods:

  • Set the default SerDes via the StreamsConfig instance.
  • Explicitly specify SerDes when calling the corresponding API method, overriding the default.

DSL Operations

To define the Stream processing Topology, Kafka streams provides Kafka Streams DSL(Domain Specific Language) that is built on top of the Streams Processor API. Recommended for beginners, the Kafka DSL code allows you to perform all the basic stream processing operations:

  • Kafka Streams DSL supports a built-in abstraction of streams and tables in the form of KStream, KTable, Global KTable. This best-in-class support for streams and tables is extremely important, as in most practical scenarios you will require a combination of both, not just streams or databases/tables.
  • You can leverage the declarative functional programming style with stateless transformations (eg maps and filters) and stateful transformations such as aggregation (eg counting and reducing), joins (eg leftJoin), and windowing (eg session windows).

Scaling Kafka Streams

You can easily scale Kafka Streams applications by balancing load and state between instances in the same pipeline. Once the aggregated results are distributed among the nodes, Kafka Streams allows you to find out which node is hosting the key so that your application can collect data from the right node or send clients to the right node.

Interactive Queries

For applications that reside in a large number of distributed instances, each including a locally managed state store, it is useful to be able to query the application externally. To this end, Kafka Streams makes it possible to query your application with interactive queries. The Kafka Streams API enables your applications to be queryable from outside your application.

Developers can effectively query the local state store of an application instance, such as a local key-value store, a local window store, or a local user-defined state store. Alternatively, developers can add an RPC(Remote Procedure Call) layer to their application(for instance REST API), expose the application’s RPC endpoint, discover the application instance and its local state store, and query the remote state store for the entire app.

Stream Threading

In Kafka Streams, you can set the number of threads used for parallel processing of application instances. This allows threads to independently perform one or more stream jobs. Threads do not share state, so no coordination between threads is required. Kafka Streams automatically handles the distribution of Kafka topic partitions to stream threads.

Launching more stream threads or more instances of an application means replicating the topology and letting another subset of Kafka partitions process it effectively parallelizing the process.

Stream-Table Duality

Kafka Streams supports both streams and bidirectionally transformable tables.  It’s the so-called stream-table duality. Tables are collections of dynamic facts. Each new event replaces the previous one, whereas streams consist of immutable information.

Streams control the whole flow of data from the subject. Tables hold state by combining data from several streams. Consider playing a chess game, as outlined in Kafka Data Modeling. The stream of continuous motions is aggregated into a table, and we may transition from one state to another.

Kafka Streams: Stream Table Duality
Image Source

KStream, KTable and GlobalKTable

Kafka Streams offers two abstractions for streams and tables. KStream handles the record stream. KTable, on the other hand, keeps track of the most recent state of a particular key in the changelog stream. Every data record indicates an update.

There is another abstraction for non-partitioned tables. We may utilize GlobalKTables to send information to all jobs or to perform joins without re-partitioning the input data.

We can read and deserialize a subject as a stream.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = 
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

It is also possible to read a topic to track the latest words received as a table:

KTable<String, String> textLinesTable = 
  builder.table(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

Finally, we are able to read a topic using a global table:

GlobalKTable<String, String> textLinesGlobalTable = 
  builder.globalTable(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

What is Kafka Streams API?

Kafka Streams API can be used to simplify the Stream Processing procedure from various disparate topics. It can provide distributed coordination, data parallelism, scalability, and fault tolerance.

This API leverages the concepts of partitions and tasks as logical units that are strongly linked to the topic partitions and interact with the cluster.

A distinct feature of Kafka Streams API is that the applications that you build with it are just normal Java applications that can be deployed, packaged, or monitored just like any other Java application.

Kafka Streams API: Use Cases

Here are a few handy Kafka Streams examples that leverage Kafka Streams API to simplify operations:

  • Finance Industry can build applications to accumulate data sources for real-time views of potential exposures. It can also be leveraged for minimizing and detecting fraudulent transactions.
  • Travel companies can build applications with the API to help them make real-time decisions to find the best suitable pricing for individual customers. This allows them to cross-sell additional services and process reservations and bookings.
  • Retailers can leverage this API to decide in real-time on the next best offers, pricing, personalized promotions, and inventory management.
  • It can also be used by logistics companies to build applications to track their shipments reliably, quickly, and in real-time.
  • Manufacturing and automotive companies can easily build applications to ensure their production lines offer optimum performance while extracting meaningful real-time insights into their supply chains. The API can also be leveraged to monitor the telemetry data from linked cars to make a decision as to the need for a thorough inspection.

Working With Kafka Streams API

To start working with Kafka Streams API you first need to add Kafka_2.12 package to your application. You can avail of this package in maven:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>1.1.0</version>
</dependency>

Next, you need to make a file streams.properties with the snippet as mentioned below. You need to make sure that you’ve replaced the bootstrap.servers list with the IP addresses of your chosen cluster:

# Kafka broker IP addresses to connect to
bootstrap.servers=54.236.208.78:9092,54.88.137.23:9092,34.233.86.118:9092

# Name of our Streams application
application.id=wordcount

# Values and Keys will be Strings
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde

# Commit at least every second instead of default 30 seconds
commit.interval.ms=1000

To leverage the Streams API with Instacluster Kafka, you also need to provide the authentication credentials. Add the following snippet to your streams.properties file while making sure that the truststore location and password are correct:

ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.location = truststore.jks
ssl.truststore.password = instaclustr
ssl.protocol=TLS
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required 
  username="ickafka" 
  password="64500f38930ddcabf1ca5b99930f9e25461e57ddcc422611cb54883b7b997edf";

To create the streams application, you need to load the properties mentioned earlier:

Properties props = new Properties();

try {
  props.load(new FileReader("streams.properties"));
} catch (IOException e) {
  e.printStackTrace();
}

Make a new input KStream object on the wordcount-input topic:

StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; source = builder.stream("wordcount-input");

Make the word count KStream that will calculate the number of times every word occurs:

final Pattern pattern = Pattern.compile("W+");

KStream counts &nbsp;= source.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
      .map((key, value) -> new KeyValue<Object, Object>(value, value))
      .groupByKey() &nbsp;&nbsp;&nbsp;
      .count(Materialized.as("CountStore"))
      .mapValues(value->Long.toString(value)).toStream();

You can then direct the output from the word count KStream to a topic named wordcount-output:

counts.to("wordcount-output");

Lastly, you can create and start the KafkaStreams object:

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

How to Use Kafka Streams?

Kafka Streams gives you the ability to perform powerful data processing operations on Kafka data in real-time. Follow this detailed guide to understand the working of Kafka Streams along with some practical use for implementation.

What is a Stream? – Table and Stream Mechanism

A stream typically refers to a general arrangement or sequence of records that are transmitted over systems. In simple words, a stream is an unbounded sequence of events. You can think of this as just things happening in the world and all of these events are immutable.

Tables are an accumulated representation or collection of streams that are transmitted in a given order. You can think of a table as the current state of the world.  

A bit more technically, a table is a materialized view of that stream of events with only the latest value for each key. Typically, a table acts as an inventory where any process is triggered.

You can go with the tables for performing aggregation queries like average, mean, maximum, minimum, etc on your datasets. And, if you want to have series of events, a dashboard/analysis showing the change, then you can make use of streams.

Kafka allows you to compute values against tables with altering streams. It makes trigger computation faster, and it is capable of working with any data source. This table and stream duality mechanism can be implemented for quick and easy real-time streaming for all kinds of applications.

A basic Kafka Streams API build can easily be structured using the following syntax:

StreamsBuilder builder = new StreamsBuilder();
 
builder.stream("raw-movies", Consumed.with(Serdes.Long(), Serdes.String())).mapValues(Parser::parseMovie).map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)).to("movies", Produced.with(Serdes.Long(), movieSerde));
 
KTable

Topic Partitioning

A topic is basically a stream of data, just like how you have tables in databases, you have topics in Kafka. Topics are then split into what are called partitions. So, a partition is basically a part of the topic and the data within the partition is ordered.

Partitioning takes the single topic log and breaks it into multiple logs each of which can live on a separate node in the Kafka cluster. You can have as many partitions per topic as you want.

The benefits with Kafka are owing to topic partitioning where messages are stored in the right partition to share data evenly. It allows the data associated with the same anchor to arrive in order.

The portioning concept is utilized in the KafkaProducer class where the cluster address, along with the value, can be specified to be transmitted, as shown:

try (KafkaProducerString,<Payment> producer = new KafkaProducer<String, Payment>(props)) {
 
    for (long i = 0; i < 10; i++) {
        final String orderId = "id" + Long.toString(i);
        final Payment payment = new Payment(orderId, 1000.00d);
        final ProducerRecord<String, Payment> record = 
           new ProducerRecord<String, Payment>("transactions", 
                                        payment.getId().toString(), 
                                        payment);
        producer.send(record);
   }
} catch (final InterruptedException e) {
    e.printStackTrace();
}

The same can be implemented for a KafkaConsumer to connect to multiple topics with the following code:

try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList(TOPIC));
 
    while (true) {
        ConsumerRecords<String, Payment> records = consumer.poll(100);
        for (ConsumerRecord<String, Payment> record : records) {
            String key = record.key();
            Payment value = record.value();
            System.out.printf("key = %s, value = %s%n", key, value);
        }
    }
}

Kafka Connect

Kafka Connect provides an ecosystem of pluggable connectors that can be implemented to balance the data load moving across external systems. In simple words, Kafka Connect is used as a tool for connecting different input and output systems to Kafka.

Using Connect, you can eliminate code with the use of JSON configurations only for the transmission.

A representation of these JSON key and value pairs can be seen as follows:

{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics"                          : "my_topic",
    "connection.url"                  : "http://elasticsearch:9200",
    "type.name"                       : "_doc",
    "key.ignore"                      : "true",
    "schema.ignore"                   : "true"
}'

ksqlDB for Stream Processing 

The use of ksqlDB for stream processing applications enables the use of a REST interface for applications to renew stream processing jobs for faster query implementations.

These are defined in SQL and can be used across languages while building an application. For a Java alternative to implementing Kafka Stream features, ksqlDB (KSQL Kafka) can be used for stream processing applications.

You can also learn about PostgreSQL Kafka Connter here.

The following ksqlDB code implements the Kafka Stream function:

CREATE TABLE rated_movies AS
   SELECT  title,
           release_year,
           sum(rating) / count(rating) AS avg_rating
   FROM ratings
   INNER JOIN movies ON ratings.movie_id = movies.movie_id
   GROUP BY title,
            release_year;

A robust code implementing Kafka Streams will cater to the above-discussed components for increased optimization, scalability, fault-tolerance, and large-scale deployment efficiency. It will look like the structure as shown below:

  import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;
 
                   import java.util.Arrays;
                   import java.util.Properties;
 
                   public class WordCountApplication {
 
                       public static void main(final String[] args) throws Exception {
                           Properties props = new Properties();
                           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("W+")))
                               .groupBy((key, word) -> word)
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
                           KafkaStreams streams = new KafkaStreams(builder.build(), props);
                           streams.start();
                       }
 
                   }

Code Snippets from Confluent and Kafka.

Difference between Kafka Streams and Kafka Consumer

Kafka Streams is an easy data processing and transformation library within Kafka used as a messaging service. Whereas, Kafka Consumer API allows applications to process messages from topics. Kafka Streams has a single stream to consume and produce, however, there is a separation of responsibility between consumers and producers in Kafka Consumer.

Kafka Streams is capable of performing complex processing but doesn’t support batch processing. Kafka Consumer supports only Single Processing but is capable of Batch Processing. Kafka stream processing is based on the concept of processor topologies, which are graphs of stream processors that define the logic and flow of data streams.

Kafka Streams supports stateless and stateful operations, but Kaka Consumer only supports stateless operations. Kafka Consumer offers you the capability to write in several Kafka Clusters, whereas Kafka Streams lets you interact with a single Kafka Cluster only.

Connecting Kafka Streams to Confluent Cloud

Here are the steps you can follow to connect Kafka Streams to Confluent Cloud:

  • Step 1: Create a java.util.Properties instance.
  • Step 2: Next, configure your streams application. Kafka and Kafka Streams Configuration options need to be configured in the java.util.Properties instance before you use Streams.

    For this example, you need to configure the Confluent Cloud broker endpoints StreamsConfig.BOOTSTRAP_SERVERS_CONFIG along with SASL config SASL_JAAS_CONFIG. Here is the code snippet for the same:
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
// Comma-separated list of the Confluent Cloud broker endpoints. For example:
// r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-endpoint1, broker-endpoint2, broker-endpoint3>");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required 
username="<api-key>" password="<api-secret>";");

// Recommended performance/resilience settings
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), 2147483647);
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), 9223372036854775807);

// Any further settings
props.put(... , ...);

Difference Between Streams and Consumer APIs

Kafka Consumer API

The Kafka Consumer API enables applications to handle messages from topics. It offers essential elements to interact with these messages, including the following capabilities:

  • Separation of responsibilities between consumers and producers
  • Processing of a single message at a time
  • Support for batch processing 
  • Stateless support, which means that the client does not retain the previous state and processes each record in the stream separately
  • Writing an application requires a considerable amount of code
  • No use of threading or parallelism
  • Writing in multiple Kafka clusters is possible
Kafka Streams
Image Source

Kafka Streams API

Kafka Streams simplifies stream processing from topics by utilizing Kafka client libraries. It offers data parallelism, distributed coordination, fault tolerance, and scalability. Kafka Streams process messages as an unbounded, continuous, and real-time flow of records. It can:

  • Consume and produce a single Kafka Stream
  • Perform complex processing
  • Not support batch processing
  • Support both stateless and stateful operations
  • Write an application in just a few lines of code
  • Handle threading and parallelism
  • Interact with only one Kafka Cluster
  • Store and transport messages using stream partitions and tasks as logical units.
Kafka Streams API
Image Source

Kafka Streams uses the concepts of partitions and tasks as logical units strongly linked to the topic partitions. Besides, it uses threads to parallelize processing within an application instance. Another important capability supported is the state stores, used by Kafka Streams to store and query data coming from the topics. Finally, Kafka Streams API interacts with the cluster, but it does not run directly on top of it.

In the coming sections, we’ll focus on four aspects that make the difference with respect to the basic Kafka clients: Stream-table duality, Kafka Streams Domain Specific Language (DSL), Exactly-Once processing Semantics (EOS), and Interactive queries.

Dependencies

To use the examples, just add the Kafka Consumer API and Kafka Streams API dependencies to our pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.4.0</version>
 </dependency>

Kafka Streams DSL

Kafka Streams DSL is a declarative and functional programming paradigm. It is developed on top of the Streams Processor API. The language includes the built-in abstractions for streams and tables described in the preceding section.

It also allows both stateless (map, filter) and stateful transformations (aggregations, joins, and windowing). Thus, stream processing procedures may be implemented using only a few lines of code.

Stateless Transformations

Stateless transformations do not require a state to operate. Similarly, the stream processor does not require a state store. Example operations include filter, map, flatMap, and groupBy.

Let’s look at how to translate the values to UpperCase, filter them from the subject, and save them as a stream:


KStream<String, String> textLinesUpperCase =
  textLines
    .map((key, value) -> KeyValue.pair(value, value.toUpperCase()))
    .filter((key, value) -> value.contains("FILTER"));

Stateful Transformations

Stateful transformations rely on the state to complete their processing processes. A message’s processing is determined by the processing of other messages (state store). In other words, the changelog subject allows you to restore any table or state store.

The word count algorithm is an example of a stateful transformation.


KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(value
    .toLowerCase(Locale.getDefault()).split("\\W+")))
  .groupBy((key, word) -> word)
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

We will send these two strings to the topic:

String TEXT_EXAMPLE_1 = "test test and test";
String TEXT_EXAMPLE_2 = "test filter filter this sentence";

Output: 

Word: and -> 1
Word: test -> 4
Word: filter -> 2
Word: this -> 1
Word: sentence -> 1

DSL has a variety of transformation features. We can connect or merge two input streams/tables that share the same key to create a new stream/table. We may also aggregate, or combine, numerous records from different streams/tables into a single record in a new table. Finally, windowing may be used to group records that share the same key in join or aggregate methods.

An example of joining with 5s windowing is merging records categorized by key from two streams into one stream.

KStream<String, String> leftRightSource = leftSource.outerJoin(rightSource,
  (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
    JoinWindows.of(Duration.ofSeconds(5))).groupByKey()
      .reduce(((key, lastValue) -> lastValue))
  .toStream();

So we’ll set the left stream value to left with key=1, and the right stream value to right with key=2. The results are as follows:

(key= 1) -> (left=left, right=null)
(key= 2) -> (left=null, right=right)

For the aggregation example, we’ll compute the word count method with the first two letters of each word as the key:

KTable<String, Long> aggregated = input
  .groupBy((key, value) -> (value != null && value.length() > 0)
    ? value.substring(0, 2).toLowerCase() : "",
    Grouped.with(Serdes.String(), Serdes.String()))
  .aggregate(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
    Materialized.with(Serdes.String(), Serdes.Long()));

With the following entries:

"one", "two", "three", "four", "five"

The output is:

Word: on -> 3
Word: tw -> 3
Word: th -> 5
Word: fo -> 4
Word: fi -> 4

Exactly-Once Processing Semantics (EOS)

There are times when we need to ensure that the customer receives the message exactly once. Kafka offered the ability to include messages in transactions, allowing EOS to be implemented using the Transactional API. Kafka Streams has covered the same feature since version 0.11.0.

To configure EOS in Kafka Streams, we’ll add the following property:

streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
  StreamsConfig.EXACTLY_ONCE);

Interactive Queries

Interactive inquiries allow you to check the status of an application in distributed environments. This refers to the ability to pull information from both local and remote storage across numerous instances. Basically, we’ll take all of the stores and put them together to get the application’s current state.

Let’s look at an example utilizing interactive queries. First, we’ll describe the processing topology, which in this case is the word count algorithm: 

KStream<String, String> textLines =
  builder.stream(TEXT_LINES_TOPIC, Consumed.with(Serdes.String(), Serdes.String()));

final KGroupedStream<String, String> groupedByWord = textLines
  .flatMapValues(value -> Arrays.asList(value.toLowerCase().split("\\W+")))
  .groupBy((key, word) -> word, Grouped.with(stringSerde, stringSerde));

Now, create a state store (key-value) for all the computed word counts:

groupedByWord
  .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("WordCountsStore")
  .withValueSerde(Serdes.Long()));

Then, we can query the key-value store:

ReadOnlyKeyValueStore<String, Long> keyValueStore =
  streams.store(StoreQueryParameters.fromNameAndType(
    "WordCountsStore", QueryableStoreTypes.keyValueStore()));

KeyValueIterator<String, Long> range = keyValueStore.all();
while (range.hasNext()) {
    KeyValue<String, Long> next = range.next();
    System.out.println("count for " + next.key + ": " + next.value);
}

The output is:

Count for and: 1
Count for filter: 2
Count for sentence: 1
Count for test: 4
Count for this: 1

Kafka Streams Advantages

Kafka’s cluster architecture makes it a fault-tolerant, highly scalable, and particularly elastic system, capable of handling hundreds of thousands of messages per second. It can be easily deployed in the cloud alongside container, virtual machine, and bare metal environments, and it adds value to both big and small use cases. Kafka Streams has exactly-once processing semantics, connects directly to Kafka, and doesn’t require a separate processing cluster.

Developers may use Kafka Streams on Linux, Mac, and Windows systems, as well as write typical Java or Scala apps. Kafka Streams is also completely linked with Kafka Security, resulting in a safe and enterprise-trusted solution for processing sensitive data. The New York Times, Pinterest, Trivago, and other major brands and companies employ Kafka Streams’ real-time data streaming capabilities. 

Best of all, Kafka Streams, being an Apache Foundation project, is completely open source. That is, Kafka Streams provides all of the advantages of a proven open source technology that is fully supported and continually worked upon by Kafka’s very active open source community. As a real-time data streaming solution, using Kafka Streams in its 100% open source form shields enterprises from the hazards of vendor and technological lock-in that come with other proprietary and “open core” data-layer options.

Conclusion 

There are multi-fold features of Apache Kafka that can be harnessed in combination with just an application code. It is data secure, scalable, and cost-efficient for ready use in a variety of systems. In this article, you were introduced to Kafka Streams, a robust horizontally scalable messaging system. You were then provided with a detailed guide on how to use Kafka Streams.

Being open-source software, Kafka is a great choice for enterprises and also holds great untapped commercial potential. Kafka Streams are easy to understand and implement for developers of all capabilities and have truly revolutionized all streaming platforms and real-time processed events.

This article provided you with a detailed guide on Kafka streams, a robust and horizontally scalable messaging system. You were introduced to the important Kafka Stream concepts and were shown how to work around them.

Visit our Website to Explore Hevo

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Hevo offers plans & pricing for different use cases and business needs, check them out!

Do you use Kafka? Share your experience of using Kafka Streams in the comment section below.

Aman Sharma
Freelance Technical Content Writer, Hevo Data

Driven by a problem-solving approach and guided by analytical thinking, Aman loves to help data practitioners solve problems related to data integration and analysis through his extensively researched content pieces.

No-code Data Pipeline for Data Warehouse