Data Ingestion Framework using Spark: Workflows Simplified 101

on Apache Spark, Data Ingestion, NoSQL, Python • May 17th, 2022 • Write for Hevo

Data Ingestion Framework using Spark - Featured Image

Businesses collect, extract & store data in several data formats such as CSV(Comma Separated Values), Parquet, JSON, Avro, etc. Reading, manipulating & loading this data into your desired destination requires a smooth Data processing system. Apache Spark has become one of the widely used Open-Source unified analytics engines that can effectively analyze Petabytes of data.

With Built-in parallelism and Fault Tolerance, you can easily design a Data Ingestion Framework using Spark. Apache Spark provides a flexible interface to write applications in several languages such as Java, Python, Scala, and R.     

In this article, you will learn about 2 different methods to effectively design a Data Ingestion Framework using Spark. 

Table of Contents

What is Apache Spark?

Data Ingestion Framework using Spark - Apache Spark Logo
Image Source

Apache Spark is an Open-Source, lightning-fast Distributed Data Processing System for Big Data and Machine Learning. It was originally developed back in 2009 and was officially launched in 2014. Attracting big enterprises such as Netflix, eBay, Yahoo, etc, Apache Spark processes and analyses Petabytes of data on clusters of over 8000 nodes. Utilizing Memory Caching and Optimal Query Execution, Spark can take on multiple workloads such as Batch Processing, Interactive Queries, Real-Time Analytics, Machine Learning, and Graph Processing.

Spark was made to overcome the challenges faced by developers with MapReduce, the disk-based computational engine at the core of early Hadoop clusters. Unlike MapReduce, Spark reduces all the intermediate computationally expensive steps by retaining the working dataset in memory until the job is completed. It has become a favorite among developers for its efficient code allowing them to write applications in Scala, Python, Java, and R. With Built-in parallelism and Fault Tolerance, Spark has assisted businesses to deliver on some of the cutting edge Big Data and AI use cases.

Simplify ETL Using Hevo’s No-Code Data Pipeline

Hevo Data, a Fully-managed Data Pipeline platform, can help you automate, simplify & enrich your data replication process in a few clicks. With Hevo’s wide variety of connectors and blazing-fast Data Pipelines, you can extract & load data from 100+ Data Sources straight into Data Warehouses, or any Databases. To further streamline and prepare your data for analysis, you can process and enrich raw granular data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!

GET STARTED WITH HEVO FOR FREE

Hevo is the fastest, easiest, and most reliable data replication platform that will save your engineering bandwidth and time multifold. Try our 14-day full access free trial today to experience an entirely automated hassle-free Data Replication!

What Data Formats are supported by Apache Spark?

Apache supports various data formats such as Parquet, Avro, CSV, JSON, etc. For this article, the following data formats are considered for the data ingestion framework using Spark:

Parquet

Parquet is a columnar storage format built to select only data from the columns you actually use and skip columns that are not requested. This format significantly reduces the size of the file as compression algorithms usually work better with data with low entropy of information, found generally in columns. Since Parquet files are binary files, they contain only the metadata about their contents. This allows Spark to simply rely on metadata to find column names, compression/encoding, data types, and even some basic statistical characteristics. Hence, without the need for reading/parsing the Parquet files, Spark SQL queries are more efficient. This type of columnar storage format can be used in Apache Hadoop.

CSV

CSV files (comma-separated values) are plain text files that are used to exchange tabular data between systems. Each line in the file is a row of a table where a comma separates the fields. 

NoSQL

A NoSQL table is a collection of items (objects) and their attributes. These “Objects” correspond to rows in the NoSQL database, and “Attributes” correspond to columns in the NoSQL database. There is a common attribute for all the items in the platform that acts as an item’s name and primary key. However, the value of this attribute is unique to each item within a given NoSQL table. This also helps in uniquely identifying a particular element in a table and effectively shard the table element.

You can build a Data INgestion framework using Spark Datasets API Reference, or the platform’s NoSQL Web API Reference, to add, extract, and delete NoSQL table items. You can get more functions out of your Spark Datasets by using the platform’s Spark API extensions or NoSQL Web API. 

How to Create a Data Ingestion Framework using Spark? 

To effectively design a Data Ingestion Framework using Spark, you can follow either of the 2 simple methods given below:

Method 1: Using a Web Notebook

One of the general approaches for creating a Data Ingestion Framework using Spark is via a web notebook like Jupyter Notebook to perform interactive data analysis. You can create a web notebook with notes that define Spark jobs to interact with the data and run the jobs from the web notebook. You can write your code in any supported language such as Scala or Python. However, for this example of creating a Data Ingestion Framework using Spark, only Python has been considered.

Here’s What Makes Hevo’s Data Ingestion Solution Unique!

Providing a high-quality ETL solution can be a difficult task if you have a large volume of data. Hevo’s automated, No-code platform empowers you with everything you need to have for a smooth data replication experience.

Check out what makes Hevo amazing:

  • Fully Managed: Hevo requires no management and maintenance as it is a fully automated platform.
  • Data Transformation: Hevo provides a simple interface to perfect, modify, and enrich the data you want to transfer.
  • Faster Insight Generation: Hevo offers near real-time data replication so you have access to real-time insight generation and faster decision making. 
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Scalable Infrastructure: Hevo has in-built integrations for 100+ sources (with 40+ free sources) that can help you scale your data infrastructure as required.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-day free trial!

You can start designing your Data Ingestion Framework using Spark by following the easy steps given below:

Step 1: Selecting a Programming Language and Creating a Spark Session

According to your preference, create a scala or a Python Notebook. Note that Scala Jupyter Notebooks are not supported in Version 3.2.3 of the platform. You can now use the code given below in your Jupyter Notebook Cell to import the required libraries and start a new Spark session:

import sys
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Data Ingestion Framework using Spark").getOrCreate()

Eventually, you can enter the following command to stop the Spark session and release its resources:

spark.stop()

Step 2: Reading the Data

In the second stage of your Data Ingestion Framework using Spark, you must start reading data from your Parquet files, CSV files, and NoSQL tables. You can go through the following python commands for the data format file you want to read:

Reading Parquet Data

For performing the reading operation for Parquet Data in your Data Ingestion Framework using Spark, consider the following python example reads a /users/new-parquet-table Parquet database table from the “folders” container into a PF DataFrame variable.

PF = spark.read.parquet("v3io://folders/users/new-parquet-table")
Reading CSV Data

You can read both CSV files and CSV directories. While creating a Data Ingestion Framework using Spark DataFrame to read CSV data, you have to let Spark know the schema of the data. To achieve this, you can either define the schema in the code itself or allow Spark to infer the schema by using a command like csv(…, inferSchema=”true”) in Python.

For instance, consider the example shown below that reads a /users/SchoolBus.csv CSV file from the “folders” container into a CF DataFrame variable.

schema = StructType([
    StructField("start_time", LongType(), True),
    StructField("end_time", LongType(), True),
    StructField("student_count", LongType(), True),
    StructField("route_distance", DoubleType(), True),
    StructField("stops_count", LongType(), True),
    StructField("teacher_count", LongType(), True)
])
CF = spark.read.schema(schema) 
    .option("header", "false") 
    .option("delimiter", "|") 
    .csv("v3io://folders/users/SchoolBus.csv")

Note that the header and delimiter parameters are optional here.

Reading NoSQL Data

In the case of reading NoSQL data in your Data Ingestion Framework using Spark DataFrame via the NoSQL Spark DataFrame, the schema of the table structure is automatically determined and extracted. Although, if you want to read NoSQL data that was written to a table in another way, you have to explicitly define the table schema. This can be done in the read operation code itself or you can let the platform infer the schema by using the inferSchema option (option(“inferSchema”, “true”)).

In this example of the reading operation in the Data Ingestion Framework using Spark, a /users/buses NoSQL table from the “folders” container into an NF DataFrame variable.

schema = StructType([
    StructField("id", StringType(), False),
    StructField("origin_city", StringType(), True),
    StructField("destination_city", StringType(), True),
    StructField("speed", DoubleType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("trip_time", LongType(), True)
])
NF = spark.read.schema(schema) 
    .format("io.iguaz.v3io.spark.sql.kv") 
    .load("v3io://folders/users/buses")

Step 3: Writing the Data

While designing your Data Ingestion Framework using Spark, you can also perform the Write Operation for the following data formats:

Writing Parquet Data

For performing the writing operation in the Data Ingestion Framework using spark, consider the following code that converts the data related to the PF DataFrame variable into a /users/new-parquet-table Parquet database table in the “folders” container.

PF.write.parquet("v3io://folders/users/new-parquet-table")
Writing CSV Data

For designing a Data Ingestion Framework using Spark for CSV files, consider this example that converts the data related to the CF DataFrame variable into /users/new-csv-data CSV data in the “folders” container.

PF.write.option("header", "true").option("delimiter", ",") 
.csv("v3io://folders/users/new-csv-data")
Writing NoSQL Data

For designing a Data Ingestion Framework using Spark for NoSQL Data, consider the example given below that converts the data related to the NF DataFrame variable into a /users/new-nosql-table NoSQL table in the “folders” container.

NF.write.format("io.iguaz.v3io.spark.sql.kv") 
    .option("key", "ID").save("v3io://folders/users/new-nosql-table")

Step 4: Running SQL Data Queries

After you have completed the reading operation stage of the Data Ingestion Framework using Spark, you can start executing SQL Data Queries. Consider the example given below  that builds a temporary newTable SQL table for the database related to the QF DataFrame variable and executes an SQL query on this table: 

QF.createOrReplaceTempView("newTable")
spark.sql("select column1, 
    count(1) as count from myTable where column2='xxx' group by column1") 
    .show()

This completes the process of creating a Data Ingestion Framework using Spark via a web notebook like Jupyter Notebooks.

Method 2: Using Databricks

You can also write a Generic Data Ingestion Framework using Spark via Databricks. You can achieve this by following the simple steps given below:

  • Step 1: You can start by creating a cluster and a notebook. For this example, a python notebook named Generic_Ingestion_Notebook is created.
  • Step 2: Now, create the following 5 parameters for your Notebook.
    • InputPath: This can be a path of your cloud location.
    • InputFile: The name of your source file.
    • TargetTable: The target table where we want to load our data.
    • TargetPath: Target Cloud path where we want to load our data.
    • LoadType: Table or File.
# Creating input parameters (widgets) 
# These value will be passed during run time
dbutils.widgets.text("InputPath", "/tmp/path", "Enter InputPath") dbutils.widgets.text("InputFile", "new.csv", "Enter InputFileName") dbutils.widgets.text("Target Table", "emp", "Snowflake TableName")
dbutils.widgets.text("TargetPath", "/tmp/target", "EnterOutputPath")  dbutils.widgets.text("LoadType", "Snowflake", "Enter LoadType")
Data Ingestion Framework using Spark - generic ingestion notebook input parameters via UI
Image Source
  • Step 3:  In this step of designing a Data Ingestion Framework using Spark, you can start extracting the file type.
# Getting the Type of file and printing the File name 
import pathlib 
path = getArgument("InputPath") + "/" + getArgument ("InputFile")
File Extension = pathlib.Path(path). Suffix
print("Processing the", File_Extension, "file")
print("FileName:- ", getArgument("InputFile"))

Output: 

Processing the .csv file 
FileName: departuredelays.csv
  • Step 4: Extract the extension of the file. i.e (.csv) , (.json), (.parquet) from the filename.
#Extracting the extension of the file. i.e (.csv), (.json), (.parquet) 2 filename = path.split("/")(-1].split(".")[-1] 3 filename

Output: 

'csv'
  • Step 5: Create a function to extract the DataFrame based on the File Format.
#Function to get the DataFrame based on the File Format 
def getDf(path, File_Extension) : 
if File_Extension == '.csv':
        inputDF = (spark.read.option("header", True).csv (path))
elif File_Extension == '.parquet':
        inputDF = (spark.read.parquet (path)) 
elif File_Extension == '.json':
        inputDF = (spark.read.json (path)) 
else:
        print("File Format Not Supported") 
return inputDF
  • Step 6: Call the function and get the required data frame.
#Calling the function and getting the Datafram
df = getDf(path, File_Extension)
display (df)

Output: 

Data Ingestion Framework using Spark - DataFrame Output
Image Source
  • Step 7: You can store and print the schema for better clarity.
# Getting the Schema of the file
inputSchema = df.schema 
print("***********Schema of the Source Data***************")
[field for field in inputSchema]
Data Ingestion Framework using Spark - Schema of the Source Data Output
Image Source
  • Step 8: Using the commands given below you can also record the count of the processed file.
# Getting the total record count of Input file
countDF = df.count() 
print("Number of records in", getArgument("InputFile"), ": ", countDF)

Output: 

Data Ingestion Framework using Spark - Number of Records Output
Image Source
  • Step 9: Now, you can save this dataframe either in any table or any file system provided that you have valid connection credentials. For this example of designing the Data Ingestion Framework using Spark, the data is being loaded into the Snowflake Cloud Data Warehouse.
loadType = getArgument("LoadType")
# write the dataset to Snowflake table.
if loadType == "Snowflake":
         tableName = getArgument ("Targettable") 
         df.write.format("snowflake").options (**options).option("dbtable", 
         tableName).mode("overwrite").save() print(tableName, "table loaded 
         successfully in Snowflake!!!")
elif loadType == "File":
         targetPath = getArgument("TargetPath") + "/out_" + 
         getArgument("InputFile")
         # write the dataset to DBFS. 
         df.write.mode("overwrite").parquet (targetPath)
         print("file loaded successfully in DBFS!!!")
else: 
         print("load not supported")

Output:

Data Ingestion Framework using Spark - Data loaded into Snowflake output
Image Source
  • Step 10: You can now build another notebook – Run_Notebook and use the magic function %run to run your Generic_Ingestion_Notebook for different file formats as shown below.
Data Ingestion Framework using Spark - Running Notebook with different file formats
Image Source

 As an example, the following image shows the output if you execute one of the cells. This concludes the process of creating a Data Ingestion Framework using Spark via Databricks.

Data Ingestion Framework using Spark - Running Notebook for a CSV file
Image Source

Conclusion

In this article, you have learned how to effectively create a Data Ingestion Framework using Spark via 2 different methods. The most common way is to use a Web Notebook like Jupyter Notebook to add, read, retrieve or delete data from your files. Using Python or Scala, you can easily read and write data with support for several data formats such as CSV, Parquet, Avro, and NoSQL. You can also use Databricks to design a Data Ingestion Framework using Spark. 

Though, both of these methods for creating a Data Ingestion Framework using Spark require you to invest a considerable portion of your engineering bandwidth for creating, developing, managing, and maintaining pipelines. As the number of data sources grows, this becomes a resource-intensive and time-consuming task that needs constant monitoring of the ever-evolving Data Connectors. Hence, these create potential bottlenecks as the business teams have to wait on the Engineering teams for providing the data. You can streamline this process by opting for a Beginner Friendly Cloud Based No-Code Data Integration Platform like Hevo Data!

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline can Ingest Data in Real-Time from a vast sea of 100+ sources to a Data Warehouse, BI Tool, or a Destination of your choice. Adding to its flexibility, Hevo provides several Data Ingestion Modes such as Change Tracking, Table, Binary Logging, Custom SQL, Oplog, etc. It is a reliable, completely automated, and secure service that doesn’t require you to write any code!  

Using pre-built integrations with sources like FTP/SFTP, Hevo allows you to load CSV, TSV, JSON, and XML files to your desired destination. You can also upload Excel Files present on your Google Drive to a Data Warehouse of your choice.

If you are using CRMs, Sales, HR, and Marketing applications and searching for a no-fuss alternative to Manual Data Integration, then Hevo can effortlessly automate this for you. Hevo, with its strong integration with 100+ sources and BI tools(Including 40+ Free Sources), allows you to not only export & load data but also transform & enrich your data & make it analysis-ready in a jiffy.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Share your experience of learning about different methods to create a Data Ingestion Framework using Spark! Let us know in the comments section below!

No-code Data Pipeline For Your Data Warehouse