Are you looking to set up a Spark Streaming and Kafka Integration? You have landed on just the right page. Kafka has been used widely for an effective streaming strategy and an array of other features for efficient stream processing and transmission.

Spark Streaming is an API that can be connected with a variety of sources including Kafka to deliver high scalability, throughput, fault-tolerance, and other benefits for a high-functioning stream processing mechanism. 

These are some features that benefit processing live data streams and channelizing them accurately. This article will help you to explore the features of Spark Streaming and the significance of Spark Streaming and Kafka Integration. You will also read about the process to initiate such integration and some drawbacks that manual Spark Streaming and Kafka Integration could pose in this scenario. 

What is Spark Streaming?

  • Spark Streaming is an extension to the central application API of Apache Spark. It allows you to extract data from several sources such as Kafka, Kinesis, TCP sockets and process it using complex algorithms expressed using high level functions such as map, reduce, join and window.
  • It optimizes the use of a discretized stream of data (DStream) that extends a continuous data stream for an enhanced level of data abstraction. 
  • Spark Streaming receives a live input stream and splits the data into batches. These batches are processed by the Spark engine to produce the final batch result stream.
  • This allows versatile integrations through different sources with Spark Streaming including Apache Kafka.

What is Apache Kafka?

Spark Streaming and Kafka Integration - Apache Kafka Logo | Hevo Data
  • Apache Kafka is a Popular Open-Source Distributed Stream Data Ingesting & Processing Platform. Providing an end-to-end solution to its users, Kafka can efficiently read & write streams of events in real-time with constant import/export of your data from other data systems. 
  • Its Reliability & Durability allows you to store streams of data securely for as long as you want. With its Best-in-Class performance, Low latency, Fault Tolerance, and High Throughput, Kafka can handle & process thousands of messages per second in Real-Time. 
  • Launched as an Open Source Messaging Queue System by LinkedIn in 2011, Kafka has now evolved into a Full-Fledged Event Streaming Platform. 
  • It is an excellent tool for building Real-Time Streaming Data Pipelines and Applications that adapt to the Data Streams. You can easily Install Kafka on Mac, Windows & Linux OS. Adding to its Flexibility, Kafka works for both Online & Offline Message Consumption.

Why is Spark Streaming and Kafka Integration Important?

There are several benefits of implementing Spark Kafka Integration:

  • By setting up the the Spark Streaming and Kafka Integration, you can ensure minimum data loss through Spark Streaming while saving all the received Kafka data synchronously for an easy recovery.
  • Users can read messages from a single topic or multiple Kafka topics. 
  • Along with this level of flexibility, you can also access high scalability, throughput and fault-tolerance, and a range of other benefits by using Spark and Kafka in tandem.

The Spark Streaming and Kafka Integration can be understood with a data pipeline that functions in the methodology shown below:

Spark Streaming and Kafka Integration: Data Pipeline | Hevo Data
Image Source

Key Kafka Spark APIs

To set up the Spark Streaming and Kafka Integration, there are namely 3 main Kafka Spark APIs:

1. StreamingContext API

This API acts as the main entry point for utilizing the Spark Streaming functionality. Using the methods provided by this API, you can create DStreams from various input sources. 

  • You can build it either by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf configuration, or even from an existing org.apache.spark.SparkContext. 
  • You can access the associated SparkContext via context.sparkContext
  • After you have built and transformed DStreams, the streaming computation can be started and stopped using context.start() and context.stop() methods respectively. 
  • You can use context.awaitTermination() to allow the current thread to wait for the termination of the context by stop() or by an exception.

2. SparkConf API

It represents the configuration for a Spark application. You can create a SparkConf object with the new SparkConf(), which will load values from any spark.* Java system properties set in your application. You can set the Spark parameters as key-value pairs that will take priority over system properties. SparkConf class has the following methods − 

  •  set(string key, string value): set configuration variable. 
  •  remove(string key): remove key from the configuration. 
  •  setAppName(string name): set application name for your application. 
  •  get(string key): get key

3. KafkaUtils API

The KafkaUtils API is allows you to connect Kafka clusters to Spark streaming and set up the Spark Streaming and Kafka Integration . This API has an important method, createStream.  It is used to create an input stream that pulls messages from the Kafka broker. It has the following parameters:

  • ssc: StreamingContext object. 
  • zkQuorum: Zookeeper Quorum. 
  • groupId: Group ID for this consumer. 
  • Topic: Returns the topic map to use. 
  • storageLevel: Storage level  for storing  received objects. 

 The KafkaUtils API has another method, createDirectStream. It is used to create an input stream that fetches messages directly from the Kafka broker without using a receiver. This stream can guarantee that every message from Kafka participates in the conversion only once.

Understanding Spark Streaming and Kafka Integration Steps

Spark Streaming and Kafka Integration allows a parallelism between partitions of Kafka and Spark along with a mutual access to metadata and offsets. 

The connection to a Spark cluster is represented by a Streaming Context API which specifies the cluster URL, name of the app as well as the batch duration. This looks as follows:

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)

For Spark Streaming and Kafka Integration, KafkaUtils API is used to create an input stream to fetch messages from Kafka. This can be represented as:

public static
ReceiverInputDStream<scala.Tuple2<String,String>>
createStream(
      StreamingContext ssc, Strig zkQuorum, String groupId,
      scala.collection.immutable.Map<String,Object> topics,
StorageLevel storageLevel)

A direct stream can also be created for an input stream to directly pull messages from Kafka. This can be implemented through the following code:

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
 
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
 
Collection<String> topics = Arrays.asList("topicA", "topicB");
 
JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );
 
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

With these commands to fetch data, you can follow some simple steps to initiate Spark Streaming and Kafka Integration:

Step 1: Build a Script

For the Spark Streaming & Kafka Integration, you need to start out by building a script to specify the application details and all library dependencies. “build.sbt” can be used to execute this and download the necessary data required for compilation and packaging of the application.

As per your dependencies your code will look something like this:

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Step 2: Create an RDD

Within this method, you can create an RDD to introduce the offset ranges and import dependencies. This will also specify topic portioning and other parameters that require correspondence.

// Import dependencies and create kafka params as in Create Direct Stream above
 
OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};
 
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

Step 3: Obtain and Store Offsets

The offset values need to be obtained and stored to be able to integrate it at the desired stage. For this, the offset range per partition can be defined as follows:

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  rdd.foreachPartition(consumerRecords -> {
    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
    System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  });
});

For transaction based data, some additional implementations need to be made to avoid duplication of datasets. This data can be represented as following depending on what data you are dealing with:

// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  Object results = yourCalculation(rdd);
  // begin your transaction
  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly
  // end your transaction
});

Step 4: Implementing SSL Spark Communication

Using the code below you can implement an SSL connection between Kafka and Spark brokers. This can help ensure that the Spark Streaming and Kafka Integration is secure and protected against any third-party infiltrations, data leaks or losses incurred in the process.

Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

Step 5: Compile and Submit to Spark Console

Finally to complete the Spark Streaming and Kafka Integration process, you can launch the Spark application by compiling and submitting the package (spark-submit).The sbt package command can be run to compile and package the application file which can then be submitted using the following command:

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

The final application output for the Spark Streaming and Kafka Integration will be seen in the format shown below:

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

Following these steps, you can successfully initiate Spark Streaming and Kafka Integration, and, thus, reap the twin benefits of both APIs for your real-time data streaming and stream processing requirements. 

Limitations of Manual Spark Streaming and Kafka Integration

Despite a structured methodology and easy-to-implement steps, there are some drawbacks of relying on a manual Spark Streaming and Kafka Integration process:

  • To begin with, one needs some technical background and skills to be able to understand and implement necessary commands for setting up the Spark Streaming and Kafka Integration manually .
  • While the Spark Streaming and Kafka Integration can be established by running a script, you will still need to individually manage the Spark inter-node communications that will not be automatically integrated. 
  • In the manual Spark Streaming and Kafka Integration, there can be some inconsistencies with more room for error, given that the commands are to be monitored when each script is run.
  • With growing data, the issues of scalability and effective specification of datasets also arise.

All these limitations of manual Kafka and Spark Streaming Integration can be effectively countered with some automation strategies and platforms to implement the same. 

Conclusion 

  • Thus, Spark Streaming and Kafka Integration can offer some pretty robust features for your data streaming requirements.
  • If you’re looking to enhance the scalability, fault-tolerance, and other features for an optimized stream processing environment, this is a combination that you must implement.
  • Learn how to set up Spark real-time streaming with our step-by-step guide to implementing real-time data processing.
  • If you’re comfortable with code, you can follow the above-mentioned steps to carry out the Kafka to Spark Streaming Integration.
  • These methods, however, can be challenging as they require a deep understanding of programming languages and other backend tools. This is where Hevo saves the day!

Tell us about your experience of learning about setting up the Spark Streaming and Kafka Integration! Share your thoughts with us in the comments section below.

Aman Sharma
Technical Content Writer, Hevo Data

Aman Deep Sharma is a data enthusiast with a flair for writing. He holds a B.Tech degree in Information Technology, and his expertise lies in making data analysis approachable and valuable for everyone, from beginners to seasoned professionals. Aman finds joy in breaking down complex topics related to data engineering and integration to help data practitioners solve their day-to-day problems.