Many Fortune 100 brands such as Twitter, LinkedIn, Airbnb, and several others have been using Apache Kafka for multiple projects and communications. This sudden credibility shift to Kafka sure makes one question the reason for this growth.

Developed in 2010, Kafka was rendered by a LinkedIn team, originally to solve latency issues for the website and its infrastructure. It proved to be a credible solution for offline systems and had an effective use for the problem at hand. In 2011, Kafka was used as an enterprise messaging solution for fetching data reliably and moving it in real-time in a batch-based approach.

Since then they have been used increasingly, each passing day to follow a robust mechanism for data relay. Read along to learn more about Apache Kafka and access a detailed guide for working with Kafka Streams.

What is Kafka Streams?

Kafka Streams: Kafka Streams logo | Hevo Data

Kafka Streams is a robust, world-class, horizontally scalable messaging system. It is a Java library that enables developers to build real-time applications and microservices that react to data events and perform complex analytics. It simplifies data processing and transformation within Kafka, making it ideal for building dynamic, event-driven solutions.

It offers a framework and clutter-free mechanism for building streaming services. You can club it up with your application code, and you’re good to go!

Here is what Kafka brings to the table to resolve targeted streaming issues:

  • The application imparts a uniform processing load, as a new iteration of the running components adds with each new instance. This instance can be recreated easily even when moved elsewhere, thus, making processing uniform and faster.
  • While a certain local state might persist on disk, any number of instances of the same can be created using Kafka to maintain a balance of processing load. 
  • It handles out-of-order data with windowing using a DataFlow-like model.
  • It combines the concept of streams and tables to simplify the processing mechanism further.

Key Features of Kafka Streams

It offers powerful capabilities for building real-time applications and microservices:

  • Distributed State: Manages distributed state by persisting data locally and within Kafka cluster topics.
  • Data Processing: Transforms input Kafka topics into output topics or integrates with external databases and services.
  • Scalability: Handles large-scale data and allows scaling by adding instances.
  • Fault Tolerance: Ensures resilience against system failures.
  • Interactive Queries: Enables querying of local and remote state stores within applications.
  • Offset Management: Automatically tracks and commits record offsets to resume processing seamlessly.

Why are Kafka Streams Needed?

Ideally, stream processing platforms are required to provide integration with data storage platforms, both for stream persistence, and static table/data stream joins.

However, the fault-tolerance and scalability factors are staunchly limited in most frameworks. The deployment, configuration, and network specifics can not be controlled completely. Kafka streams vs Kafka is not a mutually exclusive choice, as Kafka Streams is built on top of Kafka clients and relies on Kafka brokers for data processing and storage.

You can overcome the challenges of stream processing by using Streams which offer more robust options to accommodate these requirements. It comes with the below-mentioned advantages.

  • It offers a fault-tolerant cluster architecture that is highly scalable, making it suitable for handling hundreds of thousands of messages every second.
  • It can be connected to Kafka directly and is also readily deployable on the cloud.
  • It can be accessed on Linux, Mac, and Windows operating systems, and by writing standard Java or Scala scripts.
  • It handles sensitive data in a very secure and trusted way, as it is fully integrated with Kafka Security.
Effortless Kafka Data Management with Hevo

Looking for an easier way to manage Kafka data? Hevo’s no-code pipeline seamlessly integrates Kafka with your data warehouse for efficient transfer, transformation, and analysis. 

Hevo offers:

Thousands of customers worldwide trust Hevo for their data ingestion needs. Join them and experience seamless data ingestion.

Get Started with Hevo for Free

Kafka Streams Architecture

Here is the anatomy of an application that leverages the Streams API. This provides a logical view of Kafka Streams application that can contain multiple stream threads, which can in turn contain multiple stream tasks.

Kafka Streams: Kafka Streams Architecture | Hevo Data
Image Source

A Processor topology (or topology in simple terms) is used to define the Stream Processing Computational logic for your application. It refers to the way in which input data is transformed to output data.

A topology is a graph of nodes or stream processors that are connected by edges (streams) or shared state stores. In this topology, you can access the following two special processors:

  • Sink Processor: A Sink Processor is a kind of stream processor that does not include downstream processors. Sink Processors send any received records from their upstream processors to a particular Kafka topic.
  • Source Processor: A Source Processor is a kind of stream processor that does not leverage any upstream processors. Instead, it produces an input stream to its topology from one or multiple Kafka topics by extracting records from these topics and forwarding them to its downstream processors.

A Stream Processing Application can be used to define one or more such topologies, although it is generally used to define one specific topology. Developers can define topologies either through the low-level processor API or through the DSL, which incrementally builds on top of the former.

Therefore, you can define processor topology as a logical abstraction for your Stream Processing code. The logical topology gets instantiated at runtime and is replicated within the application for parallel processing.

Kafka Streams: Processor Topology | Hevo Data
Image Source

Key Concepts of Kafka Streams

Kafka Streams is a popular client library used for processing and analyzing data present in Kafka. To understand it clearly, check out its following core stream processing concepts:

Time

An important principle of stream processing is the concept of time and how it is modeled and integrated. Here are a few time terms:

  • Event Time: This is the time when an event or record occurred and was then recorded “at the source”. For instance, you can consider the event time related to a GPS sensor of a car detecting a change in the location.
  • Processing Time: This is the time when a recorded event is processed(consumed) by a stream processing application. This time can range from milliseconds to hours to days, thereby being often slower than the original event time. For instance, the processing time of the analysis application that reads and processes geolocation data recorded by car sensors.
  • Ingest Time: This time represents the instance when an event is stored in the topic partition by the Kafka broker. The time of the event ingestion is the one when the dataset is added to the target topic by the Kafka broker & not when the dataset is created “at source”. It is also different from the processing time as an event may be stored in the topic portion but is not processed.

SerDes

Providing SerDes (Serializer / Deserializer) for the data type of the record key and record value (eg java.lang.String) is essential for each Kafka Stream application to materialize the data as needed. SerDes information is important for operations such as stream (), table (), to (), through (), groupByKey (), and groupBy (). It allows you to deploy SerDes using any of the following methods:

  • Set the default SerDes via the StreamsConfig instance.
  • Explicitly specify SerDes when calling the corresponding API method, overriding the default.

DSL Operations

To define the Stream processing Topology, Kafka stream provides DSL(Domain Specific Language) that is built on top of the Streams Processor API. Recommended for beginners, the Kafka DSL code allows you to perform all the basic stream processing operations:

  • DSL supports a built-in abstraction of streams and tables in the form of KStream, KTable, Global KTable. This best-in-class support for streams and tables is extremely important, as in most practical scenarios you will require a combination of both, not just streams or databases/tables.
  • You can leverage the declarative functional programming style with stateless transformations (eg maps and filters) and stateful transformations such as aggregation (eg counting and reducing), joins (eg leftJoin), and windowing (eg session windows).

Scaling Kafka Streams

You can easily scale its applications by balancing load and state between instances in the same pipeline. Once the aggregated results are distributed among the nodes, it allows you to find out which node is hosting the key so that your application can collect data from the right node or send clients to the right node.

Interactive Queries

For applications that reside in a large number of distributed instances, each including a locally managed state store, it is useful to be able to query the application externally. To this end, Kafka Streams makes it possible to query your application with interactive queries. API enables your applications to be queryable from outside your application.

Developers can effectively query the local state store of an application instance, such as a local key-value store, a local window store, or a local user-defined state store. Alternatively, developers can add an RPC(Remote Procedure Call) layer to their application(for instance REST API), expose the application’s RPC endpoint, discover the application instance and its local state store, and query the remote state store for the entire app.

Stream Threading

In Kafka Streams, you can set the number of threads used for parallel processing of application instances. This allows threads to independently perform one or more stream jobs. Threads do not share state, so no coordination between threads is required. It automatically distributes Kafka topic partitions to stream threads.

Launching more stream threads or more instances of an application means replicating the topology and letting another subset of Kafka partitions process it, effectively parallelizing the process.

Stream-Table Duality

Kafka Streams supports both streams and bidirectionally transformable tables.  It’s the so-called stream-table duality. Tables are collections of dynamic facts. Each new event replaces the previous one, whereas streams consist of immutable information.

Streams control the whole flow of data from the subject. Tables hold state by combining data from several streams. Consider playing a chess game, as outlined in Kafka Data Modeling. The stream of continuous motions is aggregated into a table, and we may transition from one state to another.

Kafka Streams: Stream Table Duality

KStream, KTable and GlobalKTable

Two abstractions are offered for streams and tables. KStream handles the record stream. KTable, on the other hand, keeps track of the most recent state of a particular key in the changelog stream. Every data record indicates an update.

There is another abstraction for non-partitioned tables. We may utilize GlobalKTables to send information to all jobs or to perform joins without re-partitioning the input data.

We can read and deserialize a subject as a stream.

StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> textLines = 
  builder.stream(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

It is also possible to read a topic to track the latest words received as a table:

KTable<String, String> textLinesTable = 
  builder.table(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));

Finally, we are able to read a topic using a global table:

GlobalKTable<String, String> textLinesGlobalTable = 
  builder.globalTable(inputTopic, Consumed.with(Serdes.String(), Serdes.String()));
Integrate Kafka to BigQuery
Integrate Kafka to Snowflake
Integrate Kafka to Redshift

What is Kafka Streams API?

API can be used to simplify the Stream Processing procedure from various disparate topics. It can provide distributed coordination, data parallelism, scalability, and fault tolerance.

This API leverages the concepts of partitions and tasks as logical units that are strongly linked to the topic partitions and interact with the cluster.

A distinct feature of API is that the applications that you build with it are just normal Java applications that can be deployed, packaged, or monitored just like any other Java application.

Kafka Streams API: Use Cases

Here are a few handy examples that leverage API to simplify operations:

  • Finance Industry can build applications to accumulate data sources for real-time views of potential exposures. It can also be leveraged for minimizing and detecting fraudulent transactions.
  • Travel companies can build applications with the API to help them make real-time decisions to find the best suitable pricing for individual customers. This allows them to cross-sell additional services and process reservations and bookings. Example- Trivago.
  • Retailers can leverage this API to decide in real-time on the next best offers, pricing, personalized promotions, and inventory management.
  • It can also be used by logistics companies to build applications to track their shipments reliably, quickly, and in real-time.
  • Manufacturing and automotive companies can easily build applications to ensure their production lines offer optimum performance while extracting meaningful real-time insights into their supply chains. The API can also be leveraged to monitor the telemetry data from linked cars to make a decision as to the need for a thorough inspection.

Working With Kafka Streams API

To start working with API you first need to add Kafka_2.12 package to your application. You can avail of this package in maven:

<dependency>
  <groupId>org.apache.kafka</groupId>
  <artifactId>kafka-streams</artifactId>
  <version>1.1.0</version>
</dependency>

Next, you need to make a file streams.properties with the snippet as mentioned below. You need to make sure that you’ve replaced the bootstrap.servers list with the IP addresses of your chosen cluster:

# Kafka broker IP addresses to connect to
bootstrap.servers=54.236.208.78:9092,54.88.137.23:9092,34.233.86.118:9092

# Name of our Streams application
application.id=wordcount

# Values and Keys will be Strings
default.value.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
default.key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde

# Commit at least every second instead of default 30 seconds
commit.interval.ms=1000

To leverage the Streams API with Instacluster Kafka, you also need to provide the authentication credentials. Add the following snippet to your streams.properties file while making sure that the truststore location and password are correct:

ssl.enabled.protocols=TLSv1.2,TLSv1.1,TLSv1
ssl.truststore.location = truststore.jks
ssl.truststore.password = instaclustr
ssl.protocol=TLS
security.protocol=SASL_SSL
sasl.mechanism=SCRAM-SHA-256
sasl.jaas.config=org.apache.kafka.common.security.scram.ScramLoginModule required 
  username="ickafka" 
  password="64500f38930ddcabf1ca5b99930f9e25461e57ddcc422611cb54883b7b997edf";

To create the streams application, you need to load the properties mentioned earlier:

Properties props = new Properties();

try {
  props.load(new FileReader("streams.properties"));
} catch (IOException e) {
  e.printStackTrace();
}

Make a new input KStream object on the wordcount-input topic:

StreamsBuilder builder = new StreamsBuilder();
KStream&lt;String, String&gt; source = builder.stream("wordcount-input");

Make the word count KStream that will calculate the number of times every word occurs:

final Pattern pattern = Pattern.compile("W+");

KStream counts &nbsp;= source.flatMapValues(value-> Arrays.asList(pattern.split(value.toLowerCase())))
      .map((key, value) -> new KeyValue<Object, Object>(value, value))
      .groupByKey() &nbsp;&nbsp;&nbsp;
      .count(Materialized.as("CountStore"))
      .mapValues(value->Long.toString(value)).toStream();

You can then direct the output from the word count KStream to a topic named wordcount-output:

counts.to("wordcount-output");

Lastly, you can create and start the KafkaStreams object:

KafkaStreams streams = new KafkaStreams(builder.build(), props);
streams.start();

How to Use Kafka Streams?

Kafka Streams gives you the ability to perform powerful data processing operations on Kafka data in real-time. Follow this detailed guide to understand how it works along with some practical use for implementation.

What is a Stream? – Table and Stream Mechanism

A stream typically refers to a general arrangement or sequence of records that are transmitted over systems. In simple words, a stream is an unbounded sequence of events. You can think of this as just things happening in the world and all of these events are immutable.

Tables are an accumulated representation or collection of streams that are transmitted in a given order. You can think of a table as the current state of the world.  

A bit more technically, a table is a materialized view of that stream of events with only the latest value for each key. Typically, a table acts as an inventory where any process is triggered.

You can go with the tables for performing aggregation queries like average, mean, maximum, minimum, etc on your datasets. And, if you want to have series of events, a dashboard/analysis showing the change, then you can make use of streams.

Kafka allows you to compute values against tables with altering streams. It makes trigger computation faster, and it is capable of working with any data source. This table and stream duality mechanism can be implemented for quick and easy real-time streaming for all kinds of applications.

A basic API build can easily be structured using the following syntax:

StreamsBuilder builder = new StreamsBuilder();
 
builder.stream("raw-movies", Consumed.with(Serdes.Long(), Serdes.String())).mapValues(Parser::parseMovie).map((key, movie) -> new KeyValue<>(movie.getMovieId(), movie)).to("movies", Produced.with(Serdes.Long(), movieSerde));
 
KTable

Topic Partitioning

A topic is basically a stream of data, just like how you have tables in databases, you have topics in Kafka. Topics are then split into what are called partitions. So, a partition is basically a part of the topic and the data within the partition is ordered.

Partitioning takes the single topic log and breaks it into multiple logs each of which can live on a separate node in the Kafka cluster. You can have as many partitions per topic as you want.

The benefits with Kafka are owing to topic partitioning where messages are stored in the right partition to share data evenly. It allows the data associated with the same anchor to arrive in order.

The portioning concept is utilized in the KafkaProducer class where the cluster address, along with the value, can be specified to be transmitted, as shown:

try (KafkaProducerString,<Payment> producer = new KafkaProducer<String, Payment>(props)) {
 
    for (long i = 0; i < 10; i++) {
        final String orderId = "id" + Long.toString(i);
        final Payment payment = new Payment(orderId, 1000.00d);
        final ProducerRecord<String, Payment> record = 
           new ProducerRecord<String, Payment>("transactions", 
                                        payment.getId().toString(), 
                                        payment);
        producer.send(record);
   }
} catch (final InterruptedException e) {
    e.printStackTrace();
}

The same can be implemented for a KafkaConsumer to connect to multiple topics with the following code:

try (final KafkaConsumer<String, Payment> consumer = new KafkaConsumer<>(props)) {
    consumer.subscribe(Collections.singletonList(TOPIC));
 
    while (true) {
        ConsumerRecords<String, Payment> records = consumer.poll(100);
        for (ConsumerRecord<String, Payment> record : records) {
            String key = record.key();
            Payment value = record.value();
            System.out.printf("key = %s, value = %s%n", key, value);
        }
    }
}

Kafka Connect

Kafka Connect provides an ecosystem of pluggable connectors that can be implemented to balance the data load moving across external systems. In simple words, Kafka Connect is used as a tool for connecting different input and output systems to Kafka.

Using Connect, you can eliminate code with the use of JSON configurations only for the transmission.

A representation of these JSON key and value pairs can be seen as follows:

{
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "topics"                          : "my_topic",
    "connection.url"                  : "http://elasticsearch:9200",
    "type.name"                       : "_doc",
    "key.ignore"                      : "true",
    "schema.ignore"                   : "true"
}'

ksqlDB for Stream Processing 

The use of ksqlDB for stream processing applications enables the use of a REST interface for applications to renew stream processing jobs for faster query implementations.

These are defined in SQL and can be used across languages while building an application. For a Java alternative to implementing Kafka Stream features, ksqlDB (KSQL Kafka) can be used for stream processing applications.

You can also learn about PostgreSQL Kafka Connecter.

The following ksqlDB code implements the Kafka Stream function:

CREATE TABLE rated_movies AS
   SELECT  title,
           release_year,
           sum(rating) / count(rating) AS avg_rating
   FROM ratings
   INNER JOIN movies ON ratings.movie_id = movies.movie_id
   GROUP BY title,
            release_year;

A robust code implementing Kafka Streams will cater to the above-discussed components for increased optimization, scalability, fault-tolerance, and large-scale deployment efficiency. It will look like the structure as shown below:

  import org.apache.kafka.common.serialization.Serdes;
                   import org.apache.kafka.common.utils.Bytes;
                   import org.apache.kafka.streams.KafkaStreams;
                   import org.apache.kafka.streams.StreamsBuilder;
                   import org.apache.kafka.streams.StreamsConfig;
                   import org.apache.kafka.streams.kstream.KStream;
                   import org.apache.kafka.streams.kstream.KTable;
                   import org.apache.kafka.streams.kstream.Materialized;
                   import org.apache.kafka.streams.kstream.Produced;
                   import org.apache.kafka.streams.state.KeyValueStore;
 
                   import java.util.Arrays;
                   import java.util.Properties;
 
                   public class WordCountApplication {
 
                       public static void main(final String[] args) throws Exception {
                           Properties props = new Properties();
                           props.put(StreamsConfig.APPLICATION_ID_CONFIG, "wordcount-application");
                           props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-broker1:9092");
                           props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
                           props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
 
                           StreamsBuilder builder = new StreamsBuilder();
                           KStream<String, String> textLines = builder.stream("TextLinesTopic");
                           KTable<String, Long> wordCounts = textLines
                               .flatMapValues(textLine -> Arrays.asList(textLine.toLowerCase().split("W+")))
                               .groupBy((key, word) -> word)
                               .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"));
                           wordCounts.toStream().to("WordsWithCountsTopic", Produced.with(Serdes.String(), Serdes.Long()));
 
                           KafkaStreams streams = new KafkaStreams(builder.build(), props);
                           streams.start();
                       }
 
                   }

Code Snippets from Confluent and Kafka.

Difference between Kafka Streams and Kafka Consumer

FeatureKafka StreamsKafka Consumer
PurposeEasy data processing and transformation within KafkaAllows applications to process messages from topics
Stream ConsumptionSingle stream to consume and produceSeparation of responsibility between consumers and producers
Processing CapabilityCapable of performing complex processing but doesn’t support batch processing.Supports only single processing but is capable of batch processing
Processing ModelBased on the concept of processor topologies, which are graphs of stream processors defining logic and flow of data streamsStateless operations only
Operations SupportSupports stateless and stateful operationsOnly supports stateless operations
Cluster SupportInteracts with a single Kafka cluster onlyOffers the capability to write to several Kafka clusters

Connecting Kafka Streams to Confluent Cloud

Here are the steps you can follow to connect Kafka Streams to Confluent Cloud:

  • Step 1: Create a java.util.Properties instance.
  • Step 2: Next, configure your streams application. Kafka and Kafka Streams Configuration options need to be configured in the java.util.Properties instance before you use Streams.

    For this example, you need to configure the Confluent Cloud broker endpoints StreamsConfig.BOOTSTRAP_SERVERS_CONFIG along with SASL config SASL_JAAS_CONFIG. Here is the code snippet for the same:
import java.util.Properties;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.config.SaslConfigs;
import org.apache.kafka.streams.StreamsConfig;

Properties props = new Properties();
// Comma-separated list of the Confluent Cloud broker endpoints. For example:
// r0.great-app.confluent.aws.prod.cloud:9092,r1.great-app.confluent.aws.prod.cloud:9093,r2.great-app.confluent.aws.prod.cloud:9094
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "<broker-endpoint1, broker-endpoint2, broker-endpoint3>");
props.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
props.put(StreamsConfig.SECURITY_PROTOCOL_CONFIG, "SASL_SSL");
props.put(SaslConfigs.SASL_MECHANISM, "PLAIN");
props.put(SaslConfigs.SASL_JAAS_CONFIG, "org.apache.kafka.common.security.plain.PlainLoginModule required 
username="<api-key>" password="<api-secret>";");

// Recommended performance/resilience settings
props.put(StreamsConfig.producerPrefix(ProducerConfig.DELIVERY_TIMEOUT_MS_CONFIG), 2147483647);
props.put(StreamsConfig.producerPrefix(ProducerConfig.MAX_BLOCK_MS_CONFIG), 9223372036854775807);

// Any further settings
props.put(... , ...);

Difference Between Streams and Consumer APIs

Kafka Consumer API

The Kafka Consumer API enables applications to handle messages from topics. It offers essential elements to interact with these messages, including the following capabilities:

  • Separation of responsibilities between consumers and producers
  • Processing of a single message at a time
  • Support for batch processing 
  • Stateless support, which means that the client does not retain the previous state and processes each record in the stream separately
  • Writing an application requires a considerable amount of code
  • No use of threading or parallelism
  • Writing in multiple Kafka clusters is possible
Kafka Streams

Kafka Streams API

Kafka Streams simplifies stream processing from topics by utilizing Kafka client libraries. It offers data parallelism, distributed coordination, fault tolerance, and scalability. It process messages as an unbounded, continuous, and real-time flow of records. It can:

  • Consume and produce a single Kafka Stream
  • Perform complex processing
  • Not support batch processing
  • Support both stateless and stateful operations
  • Write an application in just a few lines of code
  • Handle threading and parallelism
  • Interact with only one Kafka Cluster
  • Store and transport messages using stream partitions and tasks as logical units.
Kafka Streams API

It uses the concepts of partitions and tasks as logical units strongly linked to the topic partitions. Besides, it uses threads to parallelize processing within an application instance. Another important capability supported is the state stores, which it uses to store and query data from the topics. Finally, the API interacts with the cluster, but it does not run directly on top of it.

In the coming sections, we’ll focus on four aspects that make the difference with respect to the basic Kafka clients: Stream-table duality, Domain Specific Language (DSL), Exactly-Once processing Semantics (EOS), and Interactive queries.

Dependencies

To use the examples, just add the Kafka Consumer API and Kafka Streams API dependencies to our pom.xml file:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.4.0</version>
</dependency>

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>3.4.0</version>
 </dependency>

Kafka Streams DSL

DSL is a declarative and functional programming paradigm. It is developed on top of the Streams Processor API. The language includes the built-in abstractions for streams and tables described in the preceding section.

It also allows both stateless (map, filter) and stateful transformations (aggregations, joins, and windowing). Thus, stream processing procedures may be implemented using only a few lines of code.

Stateless Transformations

Stateless transformations do not require a state to operate. Similarly, the stream processor does not require a state store. Example operations include filter, map, flatMap, and groupBy.

Let’s look at how to translate the values to UpperCase, filter them from the subject, and save them as a stream:


KStream<String, String> textLinesUpperCase =
  textLines
    .map((key, value) -> KeyValue.pair(value, value.toUpperCase()))
    .filter((key, value) -> value.contains("FILTER"));

Stateful Transformations

Stateful transformations rely on the state to complete their processing processes. A message’s processing is determined by the processing of other messages (state store). In other words, the changelog subject allows you to restore any table or state store.

The word count algorithm is an example of a stateful transformation.


KTable<String, Long> wordCounts = textLines
  .flatMapValues(value -> Arrays.asList(value
    .toLowerCase(Locale.getDefault()).split("\\W+")))
  .groupBy((key, word) -> word)
    .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

We will send these two strings to the topic:

String TEXT_EXAMPLE_1 = "test test and test";
String TEXT_EXAMPLE_2 = "test filter filter this sentence";

Output: 

Word: <strong>and</strong> -> 1
Word: test -> 4
Word: filter -> 2
Word: this -> 1
Word: sentence -> 1

DSL has a variety of transformation features. We can connect or merge two input streams/tables that share the same key to create a new stream/table. We may also aggregate, or combine, numerous records from different streams/tables into a single record in a new table. Finally, windowing may be used to group records that share the same key in join or aggregate methods.

An example of joining with 5s windowing is merging records categorized by key from two streams into one stream.

KStream<String, String> leftRightSource = leftSource.outerJoin(rightSource,
  (leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
    JoinWindows.<strong>of</strong>(Duration.ofSeconds(5))).groupByKey()
      .reduce(((key, lastValue) -> lastValue))
  .<strong>toStream</strong>();

So we’ll set the left stream value to left with key=1, and the right stream value to right with key=2. The results are as follows:

(key= 1) -> (left=left, right=<strong>null</strong>)
(key= 2) -> (left=<strong>null</strong>, right=right)

For the aggregation example, we’ll compute the word count method with the first two letters of each word as the key:

KTable<String, Long> aggregated = input
  .groupBy((key, value) -> (value != <strong>null</strong> && value.length() > 0)
    ? value.substring(0, 2).toLowerCase() : "",
    Grouped.<strong>with</strong>(Serdes.String(), Serdes.String()))
  .<strong>aggregate</strong>(() -> 0L, (aggKey, newValue, aggValue) -> aggValue + newValue.length(),
    Materialized.<strong>with</strong>(Serdes.String(), Serdes.Long()));

With the following entries:

"one", "two", "three", "four", "five"

The output is:

W<strong>ord</strong>: on -> 3
W<strong>ord</strong>: tw -> 3
W<strong>ord</strong>: th -> 5
W<strong>ord</strong>: fo -> 4
W<strong>ord</strong>: fi -> 4

Kafka Streams Advantages

Kafka’s cluster architecture makes it a fault-tolerant, highly scalable, and particularly elastic system, capable of handling hundreds of thousands of messages per second. It can be easily deployed in the cloud alongside container, virtual machine, and bare metal environments, adding value to both big and small use cases. This tool offers exactly-once processing semantics, connects directly to Kafka, and doesn’t require a separate processing cluster.

Developers can use this solution on Linux, Mac, and Windows systems, as well as write typical Java or Scala apps. It is also fully integrated with Kafka Security, providing a safe and enterprise-trusted solution for processing sensitive data. Major companies like The New York Times, Pinterest, and Trivago employ its real-time data streaming capabilities.

Best of all, being an Apache Foundation project, it is completely open source. This provides all the advantages of a proven open-source technology, fully supported and continually worked on by Kafka’s active community. As a real-time data streaming solution, using this in its open-source form shields enterprises from the risks of vendor and technological lock-in that come with proprietary and “open core” data-layer options.

Conclusion 

There are multi-fold features of Apache Kafka that can be harnessed in combination with just application code. It is data secure, scalable, and cost-efficient for ready use in a variety of systems. In this article, you were introduced to a robust horizontally scalable messaging system, followed by a detailed guide on how to use it.

Being open-source software, Kafka is a great choice for enterprises and also holds great untapped commercial potential. This tool is easy to understand and implement for developers of all capabilities and has truly revolutionized streaming platforms and real-time processed events.

Hevo simplifies data integration by providing a no-code platform that effortlessly connects to various data sources, including Kafka. Sign up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

FAQs

1. What are Streams in Kafka?

Streams in Kafka refer to a continuous flow of records within topics that can be processed in real-time or near-real-time.

2. What is the difference between Kafka Connect and Kafka Streams?

Kafka Connect is a tool for streaming data between Kafka and external systems, while Kafka Streams is a library for processing and analyzing data stored in Kafka.

3. What is the difference between Kafka Streams and Kafka Streams API?

Kafka Streams is the library that enables stream processing in Java applications, while the Kafka Streams API refers to the programming interface provided by this library for building applications.

Aman Sharma
Technical Content Writer, Hevo Data

Aman Deep Sharma is a data enthusiast with a flair for writing. He holds a B.Tech degree in Information Technology, and his expertise lies in making data analysis approachable and valuable for everyone, from beginners to seasoned professionals. Aman finds joy in breaking down complex topics related to data engineering and integration to help data practitioners solve their day-to-day problems.