When dealing with Big Data, Kafka Messaging can give a considerable communication and scalability advantage over traditional communication techniques. Apache Kafka provides a unique ability to Publish, Subscribe, Store, and Process information in real-time by combining messaging and streaming features. Kafka is leveraged by 60% of Fortune 500 organizations for a variety of purposes.

Kafka Streams makes stream processing from Kafka Topics a lot easier. It enables Data Parallelism, Distributed Coordination, Fault Tolerance, and Scalability by building on top of Kafka client libraries. It treats messages as an unlimited, continuous, and real-time stream of data. Kafka Streams provides two abstractions – KStreams and KTables. The stream of records is handled by KStream, whereas KTable keeps track of the most recent state of each key in the changelog stream.

This article will walk you through KStreams in detail. You will gain a basic understanding of Kstreams, KTables and gain insights into the topology of KStreams. In addition, you will learn about various Kafka Event Processing operations and methods supported by KStreams. Moreover, this article will guide you through the basic steps to get started with KStreams. Before diving into KStreams, let’s understand what is Apache Kafka and why is it so popular.

Introduction to Apache Kafka

KStreams - Kafka Logo

Apache Kafka is a Distributed Event Streaming platform that allows applications to manage big volumes of data efficiently. Its fault-tolerant, highly scalable design can handle billions of events with ease. The Apache Kafka framework is a Java and Scala-based distributed Publish-Subscribe Messaging system that receives Data Streams from several sources, allowing you to study Big Data streams in real-time. With minimum downtime, it can easily and swiftly scale up and down. The minimum data redundancy and fault tolerance of Kafka has increased its appeal worldwide.

Key Features of Apache Kafka

Apache Kafka is the most popular open-source stream-processing software, with high throughput, low latency, and fault tolerance. It can process thousands of messages each second. Let’s have a look at some of these powerful features:

  • Fault-Tolerant & Durable: Kafka protects data from server failure and makes it fault-tolerant by distributing its partitions and replication data across several servers. It is capable of restarting the server on its own.
  • High Scalability: Kafka’s partitioned log model distributes data over several servers, allowing it to scale beyond a single server’s capability.
  • Low Latency: As Kafka separates data streams, it has very low latency and high throughput.
  • Extensibility: A lot of additional applications have implemented interfaces with Kafka. This makes it possible to add more features in a matter of seconds. Check out how you can integrate Kafka with Amazon Redshift, Cassandra, and Spark.
  • Metrics and Monitoring: Kafka is a popular tool for tracking operational data. It helps you to seamlessly gather data from several platforms and consolidate it into centralized feeds with metrics. To read more about how you can analyze your data in Kafka, refer to Real-time Reporting with Kafka Analytics. 

Introduction to KStreams & KTable

A KStream is an abstraction of a record stream. Here, each data record represents a self-contained unit of data in the unbounded data set. In other words, data records in a record stream are always interpreted as an “INSERT”. The existing records are not replaced by the new ones having the same key. This approach is widely applied in credit card transactions, page view events, or server log entries.

For example, if the following 2 records: (“Hevo”, 1) –> (“Hevo”, 3) are streamed. You wish to sum the values per user. The streaming application will return 4 for Hevo. This is because the second data record is not considered an update to the prior one.

On the other hand, a KTable is an abstraction of the stream that keeps the most recent value. Let’s say you put the following information in the table:

{
  "name": "Hevo",
  "city": "Bangalore",
  "country": "India"
}

Now suppose, you wish to update your location. So, you now insert new info as shown below:

{
  "name": "Hevo",
  "city": "San Francisco",
  "country": "US"
}

Rather than treating this as a new block of info, the preceding fact in the KTable will now be updated to display the new values.

KTable operates mostly as a traditional database table. The only distinction is that every KTable entry is treated as an UPSERT (Insert or Update). This implies that if the KTable has an earlier version of the data, it will be UPDATED with the most recent values. If no previous version is available, the fact will be INSERTED into the KTable. Whereas, you saw that KStream just supports INSERT.

It’s worth noting that the value null has a specific meaning in KTables. When you submit a key-value pair to a KTable with a null value, it’s treated as a DELETE command, and the fact is removed from the KTable. You must ensure that you do not submit any null values from your application into a KTable, otherwise, the data you have saved will be lost.

KTable additionally allows you to check up the current values of data records using keys. This table-lookup feature is offered through both Join Operations and Interactive Queries.

Understanding Topology of KStreams

KStreams - Kafka Streams Topology
Image Source

Kafka Streams is a library that is used to create applications and manage Event Processing operations. The topology of Kafka Streams, like other stream processing systems, determines where to read data from, how to process it, and where to save the results. It includes mainly 3 nodes: Source, Processor, and Sink. These are connected by edges that represent the streams as shown above.

KStreams - KTable and KStream Topology
Image Source

KStreams and KTables are 2 main abstractions of data streams. These are represented as key-value(K, V)  pairs. A KTable may be viewed as a KStream with only the most recent value for each key, and a KStream can be viewed as a stream of modifications (changelogs) to a KTable as shown above.

Understanding Kafka Event Processing Operations Supported by KStreams

Numerous transformation features for event processing are supported by KStreams and Kafka Streams DSL (Domain Specific Language). The Streams Processor API provides the foundation for the Kafka Streams DSL. The majority of data processing tasks can be stated in a few lines of DSL code. Some of the operations supported by KStreams is as follows:

1) Aggregation

Aggregations are key-based operations, which imply that they always act on the same key’s records. They can be performed on both windowed and non-windowed data. After records have been grouped by key using groupByKey or groupBy and represented as a KGroupedStream or a KGroupedTable, they can be aggregated using a “reduce” operation.

2) Joins

Streams and tables can be joined as well. Many stream processing applications are coded as streaming joins. The joining operations listed below are supported by the KStreams and KTables:

KStreams - Joins
Image Source

3) Windowing

KStreams - Tumbling Window Operation
Image Source

Windowing allows you to customize how entries with the same key are grouped into so-called windows for stateful operations like aggregations and join. The DSL can handle the following window types:

  • Tumbling time window
  • Hopping time window
  • Sliding time window
  • Session window

4) Interactive Query Processing

In distributed systems, interactive queries allow you to check on the status of the application. This refers to the capacity to collect data not only from local stores but also from remote stores in several instances. You can take all of the stores and put them together to obtain the application’s current state.

To read more in detail about the above properties, refer to KStreams Documentation.

Steps to Build Kafka Streams using KStreams

Now that you have a basic understanding of KStreams, let’s try to stream data using KStreams API. Follow the steps below to seamlessly stream your data:

Step 1: Set Up the Environment

Before you proceed further make sure you have started your Zookeeper service and Kafka Broker. If you haven’t done this, enter the following command to start these services:

kafka_2.11-0.10.2.0$ bin/zookeeper-server-start.sh config/zookeeper.properties
kafka_2.11-0.10.2.0$ bin/kafka-server-start.sh config/server.properties

Next, create a Kafka Topic by entering the following commands:

kafka_2.11-0.10.2.0$ bin/kafka-topics.sh --create --topic names --replication-factor 1 --partitions 1 --zookeeper localhost:2181
kafka_2.11-0.10.2.0$ bin/kafka-topics.sh --create --topic hellostream --replication-factor 1 --partitions 1 --zookeeper localhost:2181

Step 2: Create Application to Stream Data

After you have started your services and created the Kafka Topics follow the instructions below to stream your data using KStreams:

  • Create a SBT file in your IDE and name it as build.sbt. You can edit the file as per the following format:
name := "KafkaStreams"

version := "1.0"

scalaVersion := "2.11.8"

organization := "com.orgname"

val kafkaStreamsVersion = "0.10.2.0"

val kafkaDependencies = Seq(
  "org.apache.kafka" % "kafka-streams" % kafkaStreamsVersion)

val main = "com.orgname.kafka.streams.KafkaStreamsExample"
mainClass in (Compile, run) := Some(main)
mainClass in (Compile, packageBin) := Some(main)

lazy val root = (project in file(".")).
  settings(
    libraryDependencies ++= kafkaDependencies,
    libraryDependencies ++= otherDependencies
  )
  • Next, create an object called HelloKStreams.scala. Then, create a Properties object with the following properties:
    • Kafka Broker Url
    • Key SerDe(Serializer and Deserializer)
    • Value SerDe
val settings = new Properties
settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "hello-kstreams")
settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092")
settings.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
settings.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.serdeFrom(classOf[String]).getClass.getName)
settings.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.serdeFrom(classOf[String]).getClass.getName)
  • Build a Stream Builder and use it to create a KStream that reads from the ‘names‘ topic. Enter the following code:
val kstreamBuilder = new KStreamBuilder
val rawStream: KStream[String, String] = kstreamBuilder.stream("names")
  • Next, using the mapValues method, map each value in the raw stream. The ValueMapper class is used by mapValues to append the word “hello” to each name read from the “names” topic. Enter the following code:
val helloStream: KStream[String, String] = rawStream.mapValues(new ValueMapper[String, String]{
  override def apply(value: String): String = s"hello $value"
})
  • Finally, you can write the result to a new topic and start processing. If the first 2 parameters in the “to” method are left blank, Kafka will use the default serializers from the Properties object set.
helloStream.to(Serdes.String, Serdes.String, "hellostream")
val streams = new KafkaStreams(kstreamBuilder, settings)
streams.start

Step 3: Test your Application

After you have created the above scala script using KStreams, it’s time that you test your application. Follow the steps below:

  • Firstly, build the application using “sbt assembly”. Then run the jar file using the following command:
java -cp target/scala-2.11/KafkaStreams-assembly-1.0.jar com.orgname.kafka.streams.HelloKStreams
  • Next, open a terminal and initiate the kafka-console-producer to send some names to the “names” topic.
  • Now, open another terminal and start the kafka-console-consumer to listen to “hellostream” topic.
bin/kafka-console-producer.sh --broker-list localhost:9092 --topic names
hevo
bin/kafka-console-consumer.sh --topic hellostream --bootstrap-server localhost:9092 --from-beginning
hello hevo

Great Job! You finally learned the basic steps involved to build a simple stream processing application using KStreams

Understanding Key Methods of KStreams

The Kafka Streams DSL provides a high-level API for easily developing Kafka Streams applications. The high-level API provides robust methods to cover most stream-processing needs right out of the box. This allows you to quickly build a powerful stream-processing application. The KStream object, which represents the streaming key/value pair records, is at the center of this high-level API.

Let’s take a look at some of the KStreams Methods. Here, K denotes the type of keys and V denotes the type of values.

1) filter

This method helps you to create a new KStream with all of this stream’s records that satisfy the provided predicate. All records that do not meet the predicate’s requirements are removed.

Method Syntax:

KStream<K,V> filter(Predicate<? super K,? super V> predicate)

Parameters:

  • Predicate: It is a filter that is applied to each record.

Returns:

  • It returns a KStream that contains the records which fulfill the Predicate’s requirement.

2) selectKey

This method creates a new key for each input record. As a result, a <K,V> input record can be turned into a <K’:V> output record using the KeyValueMapper.

Method Syntax:

<KR> KStream<KR,V> selectKey(KeyValueMapper<? super K,? super V,? extends KR> mapper)

Type Parameters:

  • KR: It represents the new key type of the result stream.

Parameters:

  • mapper: It is the KeyValueMapper that helps to create the new key for each input record.

Returns:

  • It returns a KStream consisting of records with a new key with an unmodified value.

3) mapValues

This method transforms each input record’s value into a new value of the output record. As a result, a <K,V> input record can be turned into a <K:V’> output record using ValueMapper.

Method Syntax:

<VR> KStream<K,VR> mapValues(ValueMapper<? super V,? extends VR> mapper)

Type Parameters:

  • VR: It represents the type of value of the result stream.

Parameters:

  • mapper: It is the ValueMapper that helps to create a new output value.

Returns:

  • It returns a KStream consisting of records with unmodified keys and new values.

4) to

This method with the help of the given Produced instance, materializes the stream to a topic. Before using the particular topic, it must be explicitly created.

Method Syntax:

void to(String topic, Produced<K,V> produced)

Parameters:

  • topic: It represents the topic name.
  • produced: It represents the options to deploy when producing to the topic.

5) transform

This method transforms each record in the input stream to either 0 or 1 in the output stream. Each input record is passed via a Transformer (supplied by the specified TransformerSupplier), which returns 0 or 1 output records. As a result, a < K,V> input record can be turned into a <K’:V’> output record.

Method Syntax:

<K1,V1> KStream<K1,V1> transform(TransformerSupplier<? super K,? super V,KeyValue<K1,V1>> transformerSupplier, String... stateStoreNames)

Type Parameters:

  • K1: It represents the key type of the new stream.
  • V1: It represents the value type of the new stream.

Parameters:

  • transformerSupplier: It is an instance of TransformerSupplier that generates a Transformer.
  • stateStoreNames: It contains the state store names used by the processor.

Returns:

  • It returns a KStream that contains more or less records with new keys and values.

6) groupBy

This method groups the records by their current key into a KGroupedStream. However, it retains the original values and default serializers and deserializers. Before an aggregation operator can be applied to the data, a stream must be grouped on the record key.

Method Syntax:

KGroupedStream<K,V> groupByKey()

Returns:

  • It returns a KGroupedStream containing the grouped records of the original KStream.

7) join

This method joins the records of one stream with another KStream’s records using windowed inner equi join with default serializers and deserializers. With the join attribute thisKStream.key == otherKStream.key, the join is computed on the key of the records. Moreover, 2 records are only joined if their timestamps are close to one other as described by the JoinWindows.

The given ValueJoiner will be invoked for each pair of records that fulfill both join predicates to compute a value for the result record. The output record’s key is the same as the key used to join the 2 input records. If the key or value of an input record is null, the record will be skipped during the join operation, and no output record will be added to the resultant KStream.

Method Syntax:

<VO,VR> KStream<K,VR> join(KStream<K,VO> otherStream, ValueJoiner<? super V,? super VO,? extends VR> joiner, JoinWindows windows)

Type Parameters:

  • VO: It represents the value type of the other stream.
  • VR: It represents the value type of the result stream.

Parameters:

  • otherStream:  It represents the KStream to be joined with this stream.
  • joiner: It is a ValueJoiner that computes the join result for a pair of matching records.
  • windows: It lists the specification of the JoinWindows.

Returns:

  • It returns a KStream containing join-records for each key and values.

To read about other methods supported by KStreams, visit KStreams (kafka 2.3.0 API) documentation.

Conclusion

This article helped you gain a detailed understanding of KStreams. You explored the key features of Apache Kafka and learned the basic topology of KStreams and KTables. In addition, you discovered various Kafka Event Streaming operations and methods supported by KStreams. You also understood the basic code structure to create stream data by leveraging KStreams.

However, streaming data from various sources to Apache Kafka or vice versa can be quite challenging and cumbersome. If you are facing these challenges and are looking for some solutions, then check out a simpler alternative like Hevo.

Hevo Data is a No-Code Data Pipeline that offers a faster way to move data from 100+ Data Sources including Apache Kafka, Kafka Confluent Cloud, and other 40+ Free Sources, into your Data Warehouse to be visualized in a BI tool. You can use Hevo Pipelines to replicate the data from your Apache Kafka Source or Kafka Confluent Cloud to the Destination system. Hevo is fully automated and hence does not require you to code. 

VISIT OUR WEBSITE TO EXPLORE HEVO

Want to take Hevo for a spin?

SIGN UP and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Have you streamed data using KStreams? If yes, feel free to share your experience with us in the comments section below!

mm
Former Research Analyst, Hevo Data

Shubnoor is a Data Analyst with extensive expertise in market research, and crafting marketing strategies for data industry. At Hevo, she specialized in developing connector integrations and product requirement documentation for multiple SaaS sources.

No-Code Data Pipeline For Apache Kafka