Spark Streaming and Kafka Integration: 5 Easy Steps

on Apache Spark, Data Integration, Kafka, Tutorials • November 3rd, 2020 • Write for Hevo

Spark Streaming and Kafka Integration - Featured Image

Are you looking to set up a Spark Streaming and Kafka Integration? You have landed on just the right page. Kafka has been used widely for an effective streaming strategy and an array of other features for efficient stream processing and transmission. Spark Streaming is an API that can be connected with a variety of sources including Kafka to deliver high scalability, throughput, fault-tolerance, and other benefits for a high-functioning stream processing mechanism. 

These are some features that benefit processing live data streams and channelizing them accurately. This article will help you to explore the features of Spark Streaming and the significance of Spark Streaming and Kafka Integration. You will also read about the process to initiate such integration and some drawbacks that manual Spark Streaming and Kafka Integration could pose in this scenario. 

Table of Contents

What is Spark Streaming?

Spark Streaming is an extension to the central application API of Apache Spark. It allows you to extract data from several sources such as Kafka, Kinesis, TCP sockets and process it using complex algorithms expressed using high level functions such as map, reduce, join and window. It optimizes the use of a discretized stream of data (DStream) that extends a continuous data stream for an enhanced level of data abstraction. 

Spark Streaming receives a live input stream and splits the data into batches. These batches are processed by the Spark engine to produce the final batch result stream. This allows versatile integrations through different sources with Spark Streaming including Apache Kafka. These can be understood with the representation below:

Spark streaming and Kafka integration: Ecosystem Diagram | Hevo Data
Image Source

What is Apache Kafka?

Spark Streaming and Kafka Integration - Apache Kafka Logo | Hevo Data
Image Source

Apache Kafka is a Popular Open-Source Distributed Stream Data Ingesting & Processing Platform. Providing an end-to-end solution to its users, Kafka can efficiently read & write streams of events in real-time with constant import/export of your data from other data systems. 

Its Reliability & Durability allows you to store streams of data securely for as long as you want. With its Best-in-Class performance, Low latency, Fault Tolerance, and High Throughput, Kafka can handle & process thousands of messages per second in Real-Time. 

Launched as an Open Source Messaging Queue System by LinkedIn in 2011, Kafka has now evolved into a Full-Fledged Event Streaming Platform.  It is an excellent tool for building Real-Time Streaming Data Pipelines and Applications that adapt to the Data Streams. You can easily Install Kafka on Mac, Windows & Linux OS. Adding to its Flexibility, Kafka works for both Online & Offline Message Consumption.

Key Features of Apache Kafka

Over the Years, Apache Kafka has grown into a one-stop solution for all the Stream-Processing needs. Some of its eye-catching features are: 

  • Scalability: Owning to its unique architecture, Kafka can easily handle scaling of all of its 4 elements i.e. event producers, event processors, event consumers, and event connectors without any downtime.
  • Time-Based Data Retention: Kafka offers a simple yet effective approach towards Fault Tolerance. It persistently writes and replicates all your data to the disk. You can also set the Retention Limit and recall the stored data based on those periods.  
  • Unbelievably Fast:  With the ability to handle TBs of messages, Kafka provides high throughput for both Publishing and Subscribing messages. It automatically Decouples Data Streams, thereby providing low latency.
  • Durability: Topics stored by Kafka maintain a Partitioned Structured Commit Log that continuously keeps track of all the records that are present as well as adds the new ones in Real-Time. Since the partitions are distributed and replicated across multiple servers, Kafka provides a durable Fault-Tolerant Ecosystem in cases of server failure.  
  • Integration Support: Catering to everyone’s needs, Kafka provides integration points via Connector API, allowing you to expand and grow. You can build integrations with third-party solutions, other messaging systems, and legacy applications.

Though, manually building, monitoring and maintaining data connectors with Kafka and all your sources can be a time-consuming task. A more efficient and economic solution is using a Cloud-Based ETL Tools like Hevo Data. Hevo’s 100+ pre-built connectors allows you to seamlessly transfer data from a vast sea of data sources to a Data Warehouse or a destination of your choice. 

Why is Spark Streaming and Kafka Integration Important?

There are several benefits of implementing Spark Kafka Integration:

  • By setting up the the Spark Streaming and Kafka Integration, you can ensure minimum data loss through Spark Streaming while saving all the received Kafka data synchronously for an easy recovery.
  • Users can read messages from a single topic or multiple Kafka topics. 
  • Along with this level of flexibility, you can also access high scalability, throughput and fault-tolerance, and a range of other benefits by using Spark and Kafka in tandem.

The Spark Streaming and Kafka Integration can be understood with a data pipeline that functions in the methodology shown below:

Spark Streaming and Kafka Integration: Data Pipeline | Hevo Data
Image Source

Replicate Kafka Data in Minutes using Hevo’s Data Pipelines

Hevo can be your go-to tool if you’re looking for Real-Time Data Streaming & Replication and a monopoly over all use cases of data transfer. It offers compatibility of integrations from 100+ Data Sources (including 40+ Free Data Sources) like Kafka into Redshift, Databricks, Snowflake, and many other databases and warehouse systems. Hevo provides support for Real-time Delivery of enriched and error free data, all without the hassle of writing code & no data loss. You can check out the Hevo Documentation to understand how it simplifies the process. To further streamline and prepare your data for analysis, you can process and enrich Raw Granular Data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

With Hevo in place, you can reduce your Data Extraction, Cleaning, Preparation, and Enrichment time & effort by many folds! In addition, Hevo’s native integration with BI & Analytics Tools will empower you to mine your replicated data to get actionable insights. With Hevo as one of the best Kafka Replication tools, replication of data becomes easier.

Try our 14-day full access free trial today!

Get Started with Hevo for Free

Key Kafka Spark APIs

To set up the Spark Streaming and Kafka Integration, there are namely 3 main Kafka Spark APIs:

1. StreamingContext API

This API acts as the main entry point for utilizing the Spark Streaming functionality. Using the methods provided by this API, you can create DStreams from various input sources. 

  • You can build it either by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf configuration, or even from an existing org.apache.spark.SparkContext. 
  • You can access the associated SparkContext via context.sparkContext
  • After you have built and transformed DStreams, the streaming computation can be started and stopped using context.start() and context.stop() methods respectively. 
  • You can use context.awaitTermination() to allow the current thread to wait for the termination of the context by stop() or by an exception.

2. SparkConf API

It represents the configuration for a Spark application. You can create a SparkConf object with the new SparkConf(), which will load values from any spark.* Java system properties set in your application. You can set the Spark parameters as key-value pairs that will take priority over system properties. SparkConf class has the following methods − 

  •  set(string key, string value): set configuration variable. 
  •  remove(string key): remove key from the configuration. 
  •  setAppName(string name): set application name for your application. 
  •  get(string key): get key

3. KafkaUtils API

The KafkaUtils API is allows you to connect Kafka clusters to Spark streaming and set up the Spark Streaming and Kafka Integration . This API has an important method, createStream.  It is used to create an input stream that pulls messages from the Kafka broker. It has the following parameters:

  • ssc: StreamingContext object. 
  • zkQuorum: Zookeeper Quorum. 
  • groupId: Group ID for this consumer. 
  • Topic: Returns the topic map to use. 
  • storageLevel: Storage level  for storing  received objects. 

 The KafkaUtils API has another method, createDirectStream. It is used to create an input stream that fetches messages directly from the Kafka broker without using a receiver. This stream can guarantee that every message from Kafka participates in the conversion only once.

Understanding Spark Streaming and Kafka Integration Steps

Spark Streaming and Kafka Integration allows a parallelism between partitions of Kafka and Spark along with a mutual access to metadata and offsets. 

The connection to a Spark cluster is represented by a Streaming Context API which specifies the cluster URL, name of the app as well as the batch duration. This looks as follows:

public StreamingContext(String master, String appName, Duration batchDuration, 
   String sparkHome, scala.collection.Seq<String> jars, 
   scala.collection.Map<String,String> environment)

For Spark Streaming and Kafka Integration, KafkaUtils API is used to create an input stream to fetch messages from Kafka. This can be represented as:

public static
ReceiverInputDStream<scala.Tuple2<String,String>>
createStream(
      StreamingContext ssc, Strig zkQuorum, String groupId,
      scala.collection.immutable.Map<String,Object> topics,
StorageLevel storageLevel)

A direct stream can also be created for an input stream to directly pull messages from Kafka. This can be implemented through the following code:

import java.util.*;
import org.apache.spark.SparkConf;
import org.apache.spark.TaskContext;
import org.apache.spark.api.java.*;
import org.apache.spark.api.java.function.*;
import org.apache.spark.streaming.api.java.*;
import org.apache.spark.streaming.kafka010.*;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import scala.Tuple2;
 
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "localhost:9092,anotherhost:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "use_a_separate_group_id_for_each_stream");
kafkaParams.put("auto.offset.reset", "latest");
kafkaParams.put("enable.auto.commit", false);
 
Collection<String> topics = Arrays.asList("topicA", "topicB");
 
JavaInputDStream<ConsumerRecord<String, String>> stream =
  KafkaUtils.createDirectStream(
    streamingContext,
    LocationStrategies.PreferConsistent(),
    ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams)
  );
 
stream.mapToPair(record -> new Tuple2<>(record.key(), record.value()));

With these commands to fetch data, you can follow some simple steps to initiate Spark Streaming and Kafka Integration:

Download the Guide on Data Streaming
Download the Guide on Data Streaming
Download the Guide on Data Streaming
Learn how you can enable real-time analytics with a Modern Data Stack

Step 1: Build a Script

For the Spark Streaming & Kafka Integration, you need to start out by building a script to specify the application details and all library dependencies. “build.sbt” can be used to execute this and download the necessary data required for compilation and packaging of the application.

As per your dependencies your code will look something like this:

name := "Spark Kafka Project"
version := "1.0"
scalaVersion := "2.10.5"
libraryDependencies += "org.apache.spark" %% "spark-core" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming" % "1.6.0"
libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka" % "1.6.0"

Step 2: Create an RDD

Within this method, you can create an RDD to introduce the offset ranges and import dependencies. This will also specify topic portioning and other parameters that require correspondence.

// Import dependencies and create kafka params as in Create Direct Stream above
 
OffsetRange[] offsetRanges = {
  // topic, partition, inclusive starting offset, exclusive ending offset
  OffsetRange.create("test", 0, 0, 100),
  OffsetRange.create("test", 1, 0, 100)
};
 
JavaRDD<ConsumerRecord<String, String>> rdd = KafkaUtils.createRDD(
  sparkContext,
  kafkaParams,
  offsetRanges,
  LocationStrategies.PreferConsistent()
);

Step 3: Obtain and Store Offsets

The offset values need to be obtained and stored to be able to integrate it at the desired stage. For this, the offset range per partition can be defined as follows:

stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  rdd.foreachPartition(consumerRecords -> {
    OffsetRange o = offsetRanges[TaskContext.get().partitionId()];
    System.out.println(
      o.topic() + " " + o.partition() + " " + o.fromOffset() + " " + o.untilOffset());
  });
});

For transaction based data, some additional implementations need to be made to avoid duplication of datasets. This data can be represented as following depending on what data you are dealing with:

// The details depend on your data store, but the general idea looks like this
// begin from the offsets committed to the database
Map<TopicPartition, Long> fromOffsets = new HashMap<>();
for (resultSet : selectOffsetsFromYourDatabase)
  fromOffsets.put(new TopicPartition(resultSet.string("topic"), resultSet.int("partition")), resultSet.long("offset"));
}
JavaInputDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(
  streamingContext,
  LocationStrategies.PreferConsistent(),
  ConsumerStrategies.<String, String>Assign(fromOffsets.keySet(), kafkaParams, fromOffsets)
);
stream.foreachRDD(rdd -> {
  OffsetRange[] offsetRanges = ((HasOffsetRanges) rdd.rdd()).offsetRanges();
  Object results = yourCalculation(rdd);
  // begin your transaction
  // update results
  // update offsets where the end of existing offsets matches the beginning of this batch of offsets
  // assert that offsets were updated correctly
  // end your transaction
});

Step 4: Implementing SSL Spark Communication

Using the code below you can implement an SSL connection between Kafka and Spark brokers. This can help ensure that the Spark Streaming and Kafka Integration is secure and protected against any third-party infiltrations, data leaks or losses incurred in the process.

Map<String, Object> kafkaParams = new HashMap<String, Object>();
// the usual params, make sure to change the port in bootstrap.servers if 9092 is not TLS
kafkaParams.put("security.protocol", "SSL");
kafkaParams.put("ssl.truststore.location", "/some-directory/kafka.client.truststore.jks");
kafkaParams.put("ssl.truststore.password", "test1234");
kafkaParams.put("ssl.keystore.location", "/some-directory/kafka.client.keystore.jks");
kafkaParams.put("ssl.keystore.password", "test1234");
kafkaParams.put("ssl.key.password", "test1234");

Step 5: Compile and Submit to Spark Console

Finally to complete the Spark Streaming and Kafka Integration process, you can launch the Spark application by compiling and submitting the package (spark-submit).The sbt package command can be run to compile and package the application file which can then be submitted using the following command:

/usr/local/spark/bin/spark-submit --packages org.apache.spark:spark-streaming
-kafka_2.10:1.6.0 --class "KafkaWordCount" --master local[4] target/scala-2.10/spark
-kafka-project_2.10-1.0.jar localhost:2181 <group name> <topic name> <number of threads>

The final application output for the Spark Streaming and Kafka Integration will be seen in the format shown below:

spark console messages ..
(Test,1)
(spark,1)
(another,1)
(message,1)
spark console message ..

Following these steps, you can successfully initiate Spark Streaming and Kafka Integration, and, thus, reap the twin benefits of both APIs for your real-time data streaming and stream processing requirements. 

Codes referred from https://www.tutorialspoint.com/apache_kafka/apache_kafka_integration_spark.htm

https://spark.apache.org/docs/2.2.0/streaming-kafka-0-10-integration.html#:~:text=The%20Spark%20Streaming%20integration%20for,access%20to%20offsets%20and%20metadata.

What makes Hevo’s Real-time Data Streaming & Replication Experience Best in Class?

Real-time Data Streaming and Replication can be a tiresome task without the right set of tools. Hevo’s Data Replication & Integration platform empowers you with everything you need to have a smooth Data Collection, Processing, and Replication experience. Our platform has the following in store for you!

  • Exceptional Security: A Fault-tolerant Architecture that ensures Zero Data Loss.
  • Built to Scale: Exceptional Horizontal Scalability with Minimal Latency for Modern-data Needs.
  • Built-in Connectors: Support for 100+ Data Sources, including Kafka, Databases, SaaS Platforms, Files & More. Native Webhooks & REST API Connector available for Custom Sources.
  • Data Transformations: Best-in-class & Native Support for Complex Data Transformation at fingertips. Code & No-code Flexibility ~ designed for everyone.
  • Smooth Schema Mapping: Fully-managed Automated Schema Management for incoming data with the desired destination.
  • Blazing-fast Setup: Straightforward interface for new customers to work on, with minimal setup time.
Sign up here for a 14-Day Free Trial!

Limitations of Manual Spark Streaming and Kafka Integration

Despite a structured methodology and easy-to-implement steps, there are some drawbacks of relying on a manual Spark Streaming and Kafka Integration process:

  • To begin with, one needs some technical background and skills to be able to understand and implement necessary commands for setting up the Spark Streaming and Kafka Integration manually .
  • While the Spark Streaming and Kafka Integration can be established by running a script, you will still need to individually manage the Spark inter-node communications that will not be automatically integrated. 
  • In the manual Spark Streaming and Kafka Integration, there can be some inconsistencies with more room for error, given that the commands are to be monitored when each script is run.
  • With growing data, the issues of scalability and effective specification of datasets also arise.

All these limitations of manual Spark Streaming and Kafka Integration can be effectively countered with some automation strategies and platforms to implement the same. 

Conclusion 

Thus, Spark Streaming and Kafka Integration can offer some pretty robust features for your data streaming requirements. If you’re looking to enhance the scalability, fault-tolerance, and other features for an optimized stream processing environment, this is a combination that you must implement. If you’re comfortable with code, you can follow the above-mentioned steps to carry out the Spark Streaming and Kafka Integration. These methods, however, can be challenging as they require a deep understanding of programming languages and other backend tools. This is where Hevo saves the day!

Hevo Data provides an Automated No-code Data Pipeline that empowers you to overcome the above-mentioned limitations. You can leverage Hevo to seamlessly set up Kafka ETL with Real-time Delivery of enriched and error free data, all without the hassle of writing code & no data loss. Hevo caters to 100+ Data Sources (including 40+ free sources) and can securely transfer data to Data Warehouses, Business Intelligence Tools, or any other destination of your choice in a hassle-free manner. Hevo allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in real-time.

Learn more about Hevo

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite firsthand. Do check out the pricing details to understand which plan fulfills all your business needs.

Tell us about your experience of learning about setting up the Spark Streaming and Kafka Integration! Share your thoughts with us in the comments section below.