A Data Streaming Pipeline is simply a Messaging System that executes Data Streaming Operations. The Streaming Pipeline can process data in Real-Time which eliminates the need to provision a database that holds unprocessed records.
You can use the Quarkus Kafka Streams API to stream and process data. Kafka Streams is a Client Library offering easy data processing and transformation. It assists in continuously abstracting changing Event Data Sets in Kafka clusters to support high throughput and scalability. To easily use Kafka Streams API, Quarkus provides extensions that also let you execute stream processing applications based directly on Kafka.
In this article, you will learn how to effectively develop a Data Streaming Pipeline using Quarkus Kafka Streams. In the end, you will have a solid foundation of how to apply various Kafka concepts such as Statestores, Punctuators, Processors, Windows, Joins, and Interactive Queries to build a Data Streaming Pipeline.
Table of Contents
What is Apache Kafka?
Image Source
Kafka was originally developed at LinkedIn to address their need for Monitoring Activity Stream Data and Operational Metrics such as CPU, I/O usage, and request timings. Subsequently, in early 2011, it was Open-Sourced through the Apache Software Foundation. Apache Kafka is a Distributed Event Streaming framework that enables applications to efficiently manage large volumes of data. Its fault-tolerant, highly scalable architecture can easily manage billions of events. The Apache Kafka framework is a Java and Scala-based distributed Publish-Subscribe Messaging system that receives Data Streams from several sources and allows real-time analysis of Big Data streams.
Key Features of Apache Kafka
Apache Kafka provides the following features such as communicating through messaging and stream processing to enable real-time data storage and analysis.
- Persistent messaging: Any type of information loss cannot be tolerated in order to gain real value from big data. Apache Kafka is built with O(1) Disc Structures that deliver constant-time performance even with very high volumes of stored messages (in the TBs).
- High Scalability: Kafka’s partitioned log model distributes data over multiple servers, allowing it to extend beyond the capabilities of a single server.
- Extensibility: Kafka interfaces have been implemented with plenty of other applications. More features can be added in a matter of seconds as a result of this. Check out how you can integrate Kafka with Hadoop, GitHub, and other connectors.
- Real-time Solutions: Messages created by producer threads should be instantly available to consumer threads. This characteristic is essential in event-based systems like Complex Event Processing (CEP).
- Log Compaction: Apache Kafka always keeps the latest known value for each record key, thanks to log compaction. It just preserves the most recent version of a record while deleting previous copies with the same key. This aids in data replication across nodes and serves as a re-syncing tool for failing nodes.
What is Quarkus?
Image Source
Quarkus is a powerful Full-Stack, Kubernetes-Native Java Framework developed for providing serverless application delivery. Designed for Java Virtual Machines and native compilation, it completely optimizes Java for containers and enables it to become an effective platform for Serverless, Cloud, and Kubernetes Ecosystems. Eliminating the need to check for bootstrapping a complex environment, Quarkus allows you to develop flexible applications with a useful API with little to no configuration.
What are Inner and Outer Joins in Kafka?
Image Source
When working with Relational Databases, it’s most likely that you have already come across the concept of joins where you aggregate data from two separate tables. Kafka has a similar concept where you can join streaming records from two separate Kafka Topics. Having a solid foundation of how inner and outer joins work in Kafka Streams will assist you to identify the best method of implementing your data flow pipeline.
Inner Join
Let’s imagine that you have got two distinct data streams that are arriving in two separate Kafka Topics. You can refer to one as the right topic and the one as the left topic. Now let’s also assume that there is one record arriving in the right topic that has the same key although with a different value as with another record also arriving in the left topic. Both records arrive at different times. By creating an inner join for the records with the same key, Kafka will create a new data stream with a new record.
Outer Join
When the first record arrives in the right stream, a join operation instantly creates a new record. After some time, the Outer Join Kafka stream gets data from the left stream and instantly creates another record with the values from both the right and left records. Unless explicitly stated by using the “group by” or “reduce” clauses, the outer join operation will emit three records.
With these concepts out of the way, let’s now see how you can build a data streaming pipeline using Quarkus Kafka Streams.
Hevo Data, a No-code Data Pipeline, is your one-stop-shop solution for all your Apache Kafka ETL needs! Hevo offers a built-in and robust native integration with Apache Kafka and Kafka Confluent Cloud to help you replicate data in a matter of minutes! You can seamlessly load data from Apache Kafka straight to your Desired Database, Data Warehouse, or any other destination of your choice. With Hevo in place, you can not only replicate data from 100+ Data Sources (Including 40+ Free Sources) but also enrich & transform it into an analysis-ready form without having to write a single line of code! In addition, Hevo’s fault-tolerant architecture ensures that the data is handled securely and consistently with zero data loss.
Get Started with Hevo for Free
Check out what makes Hevo amazing:
- Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
- Schema Management: Hevo takes away the tedious task of schema management & automatically detects schema of incoming data and maps it to the destination schema.
- Connectors: Hevo supports 100+ Integrations to SaaS platforms such as WordPress, Apache Kafka, Confluent Cloud, FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks, MySQL, SQL Server, TokuDB, DynamoDB, PostgreSQL Databases to name a few.
- Minimal Learning: Hevo with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
- Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
- Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
- Extensive Customer Base: Over 1000 Data-Driven organizations from 40+ Countries trust Hevo for their Data Integration needs.
- Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!
Prerequisites
- Ensure that you have installed Docker & Kafka CLI in your local environment.
How to build Data Streaming Pipelines using Quarkus Kafka Streams?
Image Source
For creating Data Streaming Pipeline using Quarkus Kafka Streams, follow the simple steps given below:
Quarkus Kafka Streams Step 1: Run a Kafka Cluster
You can follow the simple sub-steps given below to run a Kafka Cluster:
- Step 1: Run the following command to start a Kafka cluster:
$ git clone https://github.com/wurstmeister/kafka-docker.git
$ cd kafka-docker
- Step 2: Next, open the docker-compose-single-broker.yml file and edit the following line:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
to
KAFKA_ADVERTISED_HOST_NAME: localhost
- Step 3: After editing and saving your changes, run the following command:
$ docker-compose -f docker-compose-single-broker.yml up -d
Quarkus Kafka Streams Step 2: Create Kafka Topics
In this step, you are going to create the following topics using the commands given below in Kafka CLI:
- right-stream-topic
- left-stream-topic
- stream-stream-outerjoin
- processed-topic
$ cd <folder location where kafka cli binaries are located>
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic left-stream-topic
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic right-stream-topic
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic stream-stream-outerjoin
$ ./kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic processed-topic
Quarkus Kafka Streams Step 3: Perform an Outer Join
To begin with, you will create a class called KafkaStreaming in your Quarkus application as well as a function named startStreamStreamOuterJoin():
@RestController
public class KafkaStreaming {
private KafkaStreams streamsOuterJoin;
private final String LEFT_STREAM_TOPIC = "left-stream-topic";
private final String RIGHT_STREAM_TOPIC = "right-stream-topic";
private final String OUTER_JOIN_STREAM_OUT_TOPIC = "stream-stream-outerjoin";
private final String PROCESSED_STREAM_OUT_TOPIC = "processed-topic";
private final String KAFKA_APP_ID = "outerjoin";
private final String KAFKA_SERVER_NAME = "localhost:9092";
@RequestMapping("/startstream/")
public void startStreamStreamOuterJoin() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, KAFKA_APP_ID);
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_SERVER_NAME);
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
final StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> leftSource = builder.stream(LEFT_STREAM_TOPIC);
KStream<String, String> rightSource = builder.stream(RIGHT_STREAM_TOPIC);
// Add the state store code later – To be done in Step 5
leftSource.outerJoin(rightSource,
(leftValue, rightValue) -> "left=" + leftValue + ", right=" + rightValue,
JoinWindows.of(Duration.ofSeconds(5)))
.groupByKey()
.reduce(((key, lastValue) -> lastValue))
.toStream()
.to(OUTER_JOIN_STREAM_OUT_TOPIC);
final Topology topology = builder.build();
// Add stream processing code later – To be done in Step 4
streamsOuterJoin = new KafkaStreams(topology, props);
streamsOuterJoin.start();
}
}
When doing a join, you are in essence creating a new value that is combining data from two topics. If for some reason a record with a key is missing on any of the two topics, then the new value that is created from the join operation is going to be assigned the string null as the value for the missing record.
Quarkus Kafka Streams Step 4: Add a Stream Processor
Kafka has a stream processing API that you can use to process the records received by the outer join topic. You will need to write some custom logic to achieve this. To begin with, let’s define our processor, DataProcessor, and then include it in the Kafka Streaming class:
public class DataProcessor implements Processor<String, String>{
private ProcessorContext context;
@Override
public void init(ProcessorContext context) {
this.context = context;
}
@Override
public void process(String key, String value) {
if(value.contains("null")) {
// Add code for processing missing records – To be done in Step 5
} else {
processRecord(key, value);
context.forward(key, value);
}
context.commit();
}
@Override
public void close() {
}
private void processRecord (String key, String value) {
System.out.println("==== Record Processed ==== key: "+key+" and value: "+value);
}
}
This class allows us to process records, and when the value misses a null string, it is moved on to the sink topic (i.e. processed-topic).
In the KafkaStreaming class below, you piece together the parts that define the source topic, add both the processor and the sink. After that, you can add the following code snippet to the commented out section in Step 3 (“Add stream processing code later – To be done in Step 4“)
// Read data from OUTER_JOIN_STREAM_OUT_TOPIC topic
topology.addSource("Source", OUTER_JOIN_STREAM_OUT_TOPIC);
// Add a stream processor – To be done in Step 5 topology.addProcessor("StateProcessor",
new ProcessorSupplier<String, String>()
{ public Processor<String, String> get() {
return new DataProcessor();
}},
"Source");
topology.addSink("Sink", PROCESSED_STREAM_OUT_TOPIC, "StateProcessor");
Quarkus Kafka Streams Step 5: Add a StateStore and Punctuator
In the DataProcessor class, you can see that you’re only processing records that have both the left-stream and right-stream key values. But you also need to process other records that only have one of the values. However, we must introduce a delay before processing these records because sometimes some values arrive at different time windows and you don’t want to have a scenario where you’re processing records prematurely.
For this, you are going to use the Kafka StateStore for holding incoming records, tracking rolling aggregates, and de-duplication. You will use punctuators to process the records in your StateStore that have missing values. To add the Kafka StateStore in the KafkaStreaming
class, place the following code snippet where you see the comment “Add the state store code later – To be done in Step 5“:
Map<String, String> changelogConfig = newHashMap<>();
StoreBuilder<KeyValueStore<String, String>> stateStore = Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(STORE_NAME),
Serdes.String(),
Serdes.String())
.withLoggingEnabled(changelogConfig);
topology.addStateStore(stateStore, "StateProcessor");
With this snippet, you have defined a Kafka state store that stores the key and value of a record and enabled logging. Now edit the process() function so that it can place any records with null values in the State Store. Place this code snippet in your KafkaStreaming class where you’ll find the comment “Add code for processing missing records – To be done in Step 5“:
if(value.contains("null")) {
if (kvStore.get(key) != null) {
// then the other value arrived first
// process the record
String newvalue = value.concat(" ").concat(kvStore.get(key));
process(key, newvalue);
// remove the entry from the statestore based on which one arrived first
kvStore.delete(key);
context.forward(key, newvalue);
} else {
// add to state store if there are null values
System.out.println("Incomplete value: "+value+" detected. Putting into statestore for later processing");
kvStore.put(key, value);
}
}
The next thing you need to do is to add the punctuator to the processor you have just defined. To achieve this, update the DataProcessor’s init() method to look as follows:
private KeyValueStore<String, String> kvStore;
@Override
public void init(ProcessorContext context) {
this.context = context;
kvStore = (KeyValueStore) context.getStateStore(STORE_NAME);
// schedule a punctuate() method in 50 seconds intervals
this.context.schedule(Duration.ofSeconds(50), PunctuationType.WALL_CLOCK_TIME,
new Punctuator(){
@Override
public void punctuate(long timestamp) {
System.out.println("Scheduled punctuator called at "+timestamp);
KeyValueIterator<String, String> iter = kvStore.all();
while (iter.hasNext()) {
KeyValue<String, String> entry = iter.next();
System.out.println(" Processed key: "+entry.key+" and value: "+entry.value+" and sending to processed-topic topic");
context.forward(entry.key, entry.value.toString());
kvStore.put(entry.key, null);
}
iter.close();
// commit the progress
context.commit();
}
}
);
}
You’re done building the streaming pipeline. To get the entire source code, clone this Git repository.
Quarkus Kafka Streams Step 6: Run the Quarkus Streaming Application
Open your terminal and run the following bash commands to start the streaming application.
cd data-streaming-kafka-quarkus/quarkus-kafka-streaming
$ ./mvnw compile quarkus:dev
curl localhost:8080/startstream
Quarkus Kafka Streams Step 7: Run the Quarkus Producer Application
In a new terminal, execute the following commands:
$ cd data-streaming-kafka-quarkus/quarkus-kafka-producer
$ ./mvnw compile quarkus:dev
Quarkus Kafka Streams Step 8: Watch the processed-topic topic
$ cd <folder location where kafka cli binaries are located>
$ ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic processed-topic --property print.key=true --property print.timestamp=true
Quarkus Kafka Streams Step 9: Simulate Various Use Cases
- Use Case 1: Send a few records to the right-stream-topic as well as the left-stream-topic.
The left-stream-topic is going to be missing some records. The punctuator is going to process the records that are held in the state store.
$ curl localhost:8082/sendfewrecords
- Use Case 2: Send one record to left-stream-topic only.
$ curl localhost:8082/sendoneleftrecord
To view the records held in the Kafka StateStore but haven’t been processed, run the following command:
$ curl localhost:8080/storedata
- Use Case 3: Send 100000 records to the two Kafka topics.
$ curl localhost:8082/sendmanyrecords
Conclusion
In this article, you have learned how to effectively develop and run a Data Streaming Pipeline using Quarkus Kafka Streams. You can now use the data streaming pipeline that you have built to process records in Real-Time, store data without having to rely on a database or cache, and build a modern application using an event-driven architecture.
For a more in-depth and complete analysis of your business performance and financial health, it is important to consolidate from Apache Kafka and all the other applications used across your business. However, to extract this complex data with everchanging Data Connectors, you would require to invest a section of your Engineering Bandwidth to Integrate, Clean, Transform & Load data to your Data Warehouse or a destination of your choice. On the other hand, a more effortless & economical choice is exploring a Cloud-Based ETL Tool like Hevo Data.
Visit our Website to Explore Hevo
Hevo Data, a No-code Data Pipeline can seamlessly transfer data from a vast sea of sources such as Apache Kafka & Kafka Confluent Cloud to a Data Warehouse or a Destination of your choice to be visualised in a BI Tool. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!
If you are using Apache Kafka & Kafka Confluent Cloud as your Message Streaming Platform and searching for a Stress-Free Alternative to Manual Data Integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources & BI tools(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.
Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.
Tell us about your experience of building Data Streaming Pipelines using Quarkus Kafka Streams! Share your thoughts with us in the comments section below.