In today’s data-driven world, a colossal amount of data is generated from sensors, IoT devices, social networks, online transactions, and more. In order to harness the power of continuously generated data, organizations must constantly monitor through various data analysis approaches.

Consequently, there is also a greater demand for large-scale and real-time stream processing operations for analyzing the live data streams.

In order to process and analyze the real-time data streams, users need a highly scalable, robust, and fault-tolerant data streaming engine. One such highly reliable data streaming engine is Apache Spark, which allows users to implement large-scale data processing operations.

In this article, you will learn about Apache Spark, real-time streaming, and how to set up Spark Real-time Streaming. 

Prerequisites

Fundamental knowledge of data streaming.

What is Apache Spark?

spark real-time streaming: spark logo

Introduced in 2014, Apache Spark is an open-source and unified data processing engine that allows you to implement large-scale data streaming operations. In other words, Apache Spark is a highly reliable data processing framework that enables you to efficiently perform real-time processing operations on very large data sets and distributes data processing activities across several computers, either on its own or in conjunction with other distributed computing technologies.

Since Apache Spark is a multi-language data processing engine, it allows you to customize and reuse code across many workloads, including batch processing, interactive querying, machine learning, graph processing, and real-time analytics.

Understanding Real-time data Streaming

Data streaming logo
Data streaming logo

Real-time data streaming is a method of continuously gathering real-time data from multiple external data sources in the form of data streams. In addition, real-time data streaming is vital for dealing with enormous amounts of live data, which is fetched from a variety of sources, including online transactions, system logs, sensors, in-game player actions, and much more. 

While implementing the real-time streaming process, the data stream is treated as an unbound table. The process of adding new records to the stream is similar to rows being appended to the table. As a result, we can handle both batch and streaming data as tables. Based on the streaming engine, users can specify the query they want to perform, as well as the input and output locations. After providing necessary streaming information, the system then runs the query progressively, retaining enough state to recover from failure and ensure consistency in external storage.

How to Setup Spark Real-time Streaming?

In the further steps, we will set up and implement Spark Realtime Streaming. Since Spark Streaming supports receiving data streams over TCP connections, you can create a simple streaming application that receives text data streams on a certain port. After retrieving data streams, the application performs basic text cleaning procedures such as white space removal, stop words removal, and lemmatization, and then prints the cleaned text on the screen. 

  • Initially, for spark Real-time Streaming, you have to create a StreamingContext, which serves as the primary entry point for any streaming application. It can be created by invoking the StreamingContext class from the “pyspark.streaming” package.
  • Execute the following command to import the necessary packages.
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
  • We can define the batch duration while constructing a StreamingContext. For example, here, we provided the batch duration as 3 seconds.
sc = SparkContext(appName = "Text Cleaning")
strc = StreamingContext(sc, 3)
  • After creating the StreamingContext, we can begin receiving data in the form of DStreams via the TCP protocol on a specified port. For example, in our case, the hostname is “localhost,” and the port is 8084.
text_data = strc.socketTextStream("localhost", 8084)
  • After receiving data streams, we can implement data cleaning operations by creating a custom text cleaning function.
import re
from nltk.corpus import stopwords
stop_words = set(stopwords.words('english'))
from nltk.stem import WordNetLemmatizer
lemmatizer = WordNetLemmatizer()
def clean_text(sentence):
    sentence = sentence.lower()
    sentence = re.sub("s+"," ", sentence)
    sentence = re.sub("W"," ", sentence)
    sentence = re.sub(r"httpS+", "", sentence)
    sentence = ' '.join(word for word in sentence.split() if word not in stop_words)
    sentence = [lemmatizer.lemmatize(token, "v") for token in sentence.split()]
    sentence = " ".join(sentence)
    return sentence.strip()
  • The custom cleaning text function transforms the input text to lowercase, removes excess spaces, non-alphanumeric characters, links/URLs, and stop words, then lemmatizes it using the NLTK library.
  • After implementing the cleaning process, we are all set to start the Streaming operations.
  • To begin the Spark Real time Streaming process and continue receiving real-time streaming data, use the start() function with the StreamingContext object, i.e., “strc.” 
  • The data will be streamed until the awaitTermination() method gets a termination instruction, such as (Ctrl + C or Ctrl + Z).
  • Now you have to send the previously streamed text data from the data server to the spark streaming server.
  • In order to transfer the text data from the data server to the spark streaming server, we must run the ‘nc’ command in the Netcat Utility. Netcat is an application that allows you to read and write to network connections via TCP or UDP ports. 
  • Execute the following command to send the text data from the data server to the spark streaming server.
nc -lk 8083
  • In the above command, -l permits “nc” to listen for incoming connections rather than connecting to a remote host, while -k enables “nc” to keep listening for new connections after the current one is finished.
  • Now, execute the following command to implement text cleaning on the received data on port 8083.
 spark-submit streaming.py localhost 8083
  • After executing the above command for Real-time Streaming, any text received in the netcat server is cleaned, and the cleaned content is printed in another terminal every 3 seconds, which is the batch duration provided while creating the streaming context.
  • By following the above steps, you successfully implemented Spark Real-time Streaming. However, you can also set up Spark Real-time Streaming using Databricks.
Replicate Kafka Data in Databricks in Minutes using the Hevo’s No-code Data Ingestion Pipeline

Hevo Data,a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 150+ data sources like Apache Spark (including 40+ free data sources) straight into your Data Warehouse, Data Lake like Databricks or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

Get Started with Hevo for Free

Setting up Spark Real-time Streaming Using Databricks

In order to implement Spark Real-time Streaming, you have to follow four steps: Load sample data, initialize a stream, start a streaming job, and query a stream.

  • The most straightforward approach to get started with Structured Streaming is to utilize a sample Databricks dataset from the /databricks-datasets folder within the Databricks workspace. To construct a Structured real-time Streaming application, Databricks provides sample event data as files in /databricks-datasets/structured-streaming/events/. The contents of the directory resemble the below-given image. 
  • You have to load the sample by executing the following command.
%fs ls /databricks-datasets/structured-streaming/events/
spark real-time streaming: output of all the events
Spark real-time streaming: output of all the events
  • In the Databricks directory, each file comprises a JSON record with two fields, such as time and action. The sample JSON format with two fields is given below.
{"time":1469501675,"action":"Open"}
{"time":1469501678,"action":"Close"}{"time":1469501680,"action":"Open"}{"time":1469501685,"action":"Open"}{"time":1469501686,"action":"Open"}{"time":1469501689,"action":"Open"}{"time":1469501691,"action":"Open"}{"time":1469501694,"action":"Open"}{"time":1469501696,"action":"Close"}{"time":1469501702,"action":"Open"}{"time":1469501703,"action":"Open"}{"time":1469501704,"action":"Open"}
  • After loading the sample file, you have to initialize the streaming operation in Databricks. Since the data sample is essentially a static collection of files, you can generate and initialize a stream by reading one file at a time in the chronological sequence by which they were generated in the Databricks directory. 
  • Execute the following code to initialize the streaming operation.
inputPath = "/databricks-datasets/structured-streaming/events/"

# Define the schema to speed up processing
jsonSchema = StructType([ StructField("time", TimestampType(), True), StructField("action", StringType(), True) ])

streamingInputDF = (
  spark
    .readStream
    .schema(jsonSchema)               
    .option("maxFilesPerTrigger", 1)  
    .json(inputPath)
)

streamingCountsDF = (
  streamingInputDF
    .groupBy(
      streamingInputDF.action,
      window(streamingInputDF.time, "1 hour"))
    .count()
)
  • After executing the above code, you successfully initialized the streaming operation. Now you are all set to start the real-time streaming job.
  • You can begin the process by defining and launching a sink. If you have to query the total stream counts interactively, store the entire set of 1-hour counts in an in-memory table.
  • To start the Spark Real-time Streaming, execute the following code.
query = (
  streamingCountsDF
    .writeStream
    .format("memory")    
    .queryName("counts")    
    .outputMode("complete")    
    .start()
)
  • In the above code, query is a reference to the background-running streaming query named counts. This query gathers up files on a continual basis and updates the windowed counts frequently.
  • After executing the the process, you will get the status of the Spark Real-time Streaming, as shown below.
spark real-time streaming: status of spark real-time streaming
Spark real-time streaming: status of spark real-time streaming
  • If you expand “counts” in the output, you will get a dashboard with the number of data processed, batch statistics, and the aggregation’s current state, as shown below.
spark real-time streaming: statistics of all the data
Spark real-time streaming: statistics of all the data
  • After successfully initiating the Spark Real-time Streaming process, you can also query the stream interactively to perform specific aggregation actions.
  • For example, to query the previously generated stream values, execute the following command.
%sql select action, date_format(window.end, "MMM-dd HH:mm") as time, count from counts order by time, action

After executing the above-given code, you will get the following output.

Bar graphs of all the actions at a specified date
  • If you run the query again, the output will be different since the query updates each time it is executed to reflect the action count depending on the frequently changing data stream input. 
  • The below-given images are the outputs of the same query executed at different times.
spark real-time streaming: running the same query again with different output
spark real-time streaming: running the query third time with greater output.
Spark real-time streaming: Running the query third time with greater output.

By following the above-mentioned steps, you successfully implemented Spark Real-time Streaming Using Databricks.

Conclusion

In this article, you learned about Apache Spark, real-time streaming, and how to set up Spark Real-time Streaming. This article mainly focused on setting up the streaming process using Apache Spark in the local server and Databricks.

However, you can also set up and configure a real-time data streaming process on AWS, thereby creating Spark streaming apps to publish and process clickstream events. 

There are various trusted sources that companies use as it provides many benefits but transferring data from it into a data warehouse is a hectic task. The Automated data pipeline helps in solving this issue and this is where Hevo comes into the picture. Hevo Data is a No-code Data Pipeline and has awesome 150+ pre-built Integrations that you can choose from.

Hevo can help you Integrate your data from 150+ data sources and load them into a destination to Analyze real-time data. It will make your life easier and data migration hassle-free. It is user-friendly, reliable, and secure.

SIGN UP for a 14-day free trial and see the difference!

Share your experience of learning about Spark Real-time Streaming in the comments section below.

Ishwarya M
Technical Content Writer, Hevo Data

Ishwarya is a skilled technical writer with over 5 years of experience. She has extensive experience working with B2B SaaS companies in the data industry, she channels her passion for data science into producing informative content that helps individuals understand the complexities of data integration and analysis.

No-code Data Pipeline For your Data Warehouse