Businesses collect, extract & store data in several data formats such as CSV(Comma Separated Values), Parquet, JSON, Avro, etc. Reading, manipulating & loading this data into your desired destination requires a smooth Data processing system. Apache Spark has become one of the widely used Open-Source unified analytics engines that can effectively analyze Petabytes of data.
With Built-in parallelism and Fault Tolerance, you can easily design a Data Ingestion Framework using Spark. Apache Spark provides a flexible interface to write applications in several languages such as Java, Python, Scala, and R.
In this article, you will learn about 2 different methods to effectively design a Data Ingestion Framework using Spark.
What is Apache Spark?
Apache Spark is an Open-Source, lightning-fast Distributed Data Processing System for Big Data and Machine Learning. It was originally developed back in 2009 and was officially launched in 2014. Attracting big enterprises such as Netflix, eBay, Yahoo, etc, Apache Spark processes and analyses Petabytes of data on clusters of over 8000 nodes. Utilizing Memory Caching and Optimal Query Execution, Spark can take on multiple workloads such as Batch Processing, Interactive Queries, Real-Time Analytics, Machine Learning, and Graph Processing.
Spark was made to overcome the challenges faced by developers with MapReduce, the disk-based computational engine at the core of early Hadoop clusters. Unlike MapReduce, Spark reduces all the intermediate computationally expensive steps by retaining the working dataset in memory until the job is completed. It has become a favorite among developers for its efficient code allowing them to write applications in Scala, Python, Java, and R. With Built-in parallelism and Fault Tolerance, Spark has assisted businesses to deliver on some of the cutting edge Big Data and AI use cases.
Looking for the best ETL tools to connect your data sources? Rest assured, Hevo’s no-code platform helps streamline your ETL process. Try Hevo and equip your team to:
- Integrate data from 150+ sources(60+ free sources).
- Utilize drag-and-drop and custom Python script features to transform your data.
GET STARTED WITH HEVO FOR FREE
What Data Formats are supported by Apache Spark?
Apache supports various data formats such as Parquet, Avro, CSV, JSON, etc. For this article, the following data formats are considered for the data ingestion framework using Spark:
1. Parquet
Parquet is a columnar storage format built to select only data from the columns you actually use and skip columns that are not requested. This format significantly reduces the size of the file as compression algorithms usually work better with data with low entropy of information, found generally in columns. Since Parquet files are binary files, they contain only the metadata about their contents. This allows Spark to simply rely on metadata to find column names, compression/encoding, data types, and even some basic statistical characteristics. Hence, without the need for reading/parsing the Parquet files, Spark SQL queries are more efficient. This type of columnar storage format can be used in Apache Hadoop.
2. CSV
CSV files (comma-separated values) are plain text files that are used to exchange tabular data between systems. Each line in the file is a row of a table where a comma separates the fields.
3. NoSQL
A NoSQL table is a collection of items (objects) and their attributes. These “Objects” correspond to rows in the NoSQL database, and “Attributes” correspond to columns in the NoSQL database. There is a common attribute for all the items in the platform that acts as an item’s name and primary key. However, the value of this attribute is unique to each item within a given NoSQL table. This also helps in uniquely identifying a particular element in a table and effectively shard the table element.
You can build a Data INgestion framework using Spark Datasets API Reference, or the platform’s NoSQL Web API Reference, to add, extract, and delete NoSQL table items. You can get more functions out of your Spark Datasets by using the platform’s Spark API extensions or NoSQL Web API.
How to Create a Data Ingestion Framework using Spark?
To effectively design a Data Ingestion Framework using Spark, you can follow either of the 2 simple methods given below:
Method 1: Using a Web Notebook
One of the general approaches for creating a Data Ingestion Framework using Spark is via a web notebook like Jupyter Notebook to perform interactive data analysis. You can create a web notebook with notes that define Spark jobs to interact with the data and run the jobs from the web notebook. You can write your code in any supported language such as Scala or Python. However, for this example of creating a Data Ingestion Framework using Spark, only Python has been considered.
You can start designing your Data Ingestion Framework using Spark by following the easy steps given below:
Step 1: Selecting a Programming Language and Creating a Spark Session
According to your preference, create a scala or a Python Notebook. Note that Scala Jupyter Notebooks are not supported in Version 3.2.3 of the platform. You can now use the code given below in your Jupyter Notebook Cell to import the required libraries and start a new Spark session:
import sys
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
spark = SparkSession.builder.appName("Data Ingestion Framework using Spark").getOrCreate()
Eventually, you can enter the following command to stop the Spark session and release its resources:
spark.stop()
Step 2: Reading the Data
In the second stage of your Data Ingestion Framework using Spark, you must start reading data from your Parquet files, CSV files, and NoSQL tables. You can go through the following python commands for the data format file you want to read:
Reading Parquet Data
For performing the reading operation for Parquet Data in your Data Ingestion Framework using Spark, consider the following python example reads a /users/new-parquet-table Parquet database table from the “folders” container into a PF DataFrame variable.
PF = spark.read.parquet("v3io://folders/users/new-parquet-table")
Reading CSV Data
You can read both CSV files and CSV directories. While creating a Data Ingestion Framework using Spark DataFrame to read CSV data, you have to let Spark know the schema of the data. To achieve this, you can either define the schema in the code itself or allow Spark to infer the schema by using a command like csv(…, inferSchema=”true”) in Python.
For instance, consider the example shown below that reads a /users/SchoolBus.csv CSV file from the “folders” container into a CF DataFrame variable.
schema = StructType([
StructField("start_time", LongType(), True),
StructField("end_time", LongType(), True),
StructField("student_count", LongType(), True),
StructField("route_distance", DoubleType(), True),
StructField("stops_count", LongType(), True),
StructField("teacher_count", LongType(), True)
])
CF = spark.read.schema(schema)
.option("header", "false")
.option("delimiter", "|")
.csv("v3io://folders/users/SchoolBus.csv")
Note that the header and delimiter parameters are optional here.
Reading NoSQL Data
In the case of reading NoSQL data in your Data Ingestion Framework using Spark DataFrame via the NoSQL Spark DataFrame, the schema of the table structure is automatically determined and extracted. Although, if you want to read NoSQL data that was written to a table in another way, you have to explicitly define the table schema. This can be done in the read operation code itself or you can let the platform infer the schema by using the inferSchema option (option(“inferSchema”, “true”)).
In this example of the reading operation in the Data Ingestion Framework using Spark, a /users/buses NoSQL table from the “folders” container into an NF DataFrame variable.
schema = StructType([
StructField("id", StringType(), False),
StructField("origin_city", StringType(), True),
StructField("destination_city", StringType(), True),
StructField("speed", DoubleType(), True),
StructField("passenger_count", DoubleType(), True),
StructField("trip_time", LongType(), True)
])
NF = spark.read.schema(schema)
.format("io.iguaz.v3io.spark.sql.kv")
.load("v3io://folders/users/buses")
Step 3: Writing the Data
While designing your Data Ingestion Framework using Spark, you can also perform the Write Operation for the following data formats:
Writing Parquet Data
For performing the writing operation in the Data Ingestion Framework using spark, consider the following code that converts the data related to the PF DataFrame variable into a /users/new-parquet-table Parquet database table in the “folders” container.
PF.write.parquet("v3io://folders/users/new-parquet-table")
Writing CSV Data
For designing a Data Ingestion Framework using Spark for CSV files, consider this example that converts the data related to the CF DataFrame variable into /users/new-csv-data CSV data in the “folders” container.
PF.write.option("header", "true").option("delimiter", ",")
.csv("v3io://folders/users/new-csv-data")
Writing NoSQL Data
For designing a Data Ingestion Framework using Spark for NoSQL Data, consider the example given below that converts the data related to the NF DataFrame variable into a /users/new-nosql-table NoSQL table in the “folders” container.
NF.write.format("io.iguaz.v3io.spark.sql.kv")
.option("key", "ID").save("v3io://folders/users/new-nosql-table")
Step 4: Running SQL Data Queries
After you have completed the reading operation stage of the Data Ingestion Framework using Spark, you can start executing SQL Data Queries. Consider the example given below that builds a temporary newTable SQL table for the database related to the QF DataFrame variable and executes an SQL query on this table:
QF.createOrReplaceTempView("newTable")
spark.sql("select column1,
count(1) as count from myTable where column2='xxx' group by column1")
.show()
This completes the process of creating a Data Ingestion Framework using Spark via a web notebook like Jupyter Notebooks.
Method 2: Using Databricks
You can also write a Generic Data Ingestion Framework using Spark via Databricks. You can achieve this by following the simple steps given below:
- Step 1: You can start by creating a cluster and a notebook. For this example, a python notebook named Generic_Ingestion_Notebook is created.
- Step 2: Now, create the following 5 parameters for your Notebook.
- InputPath: This can be a path of your cloud location.
- InputFile: The name of your source file.
- TargetTable: The target table where we want to load our data.
- TargetPath: Target Cloud path where we want to load our data.
- LoadType: Table or File.
# Creating input parameters (widgets)
# These value will be passed during run time
dbutils.widgets.text("InputPath", "/tmp/path", "Enter InputPath") dbutils.widgets.text("InputFile", "new.csv", "Enter InputFileName") dbutils.widgets.text("Target Table", "emp", "Snowflake TableName")
dbutils.widgets.text("TargetPath", "/tmp/target", "EnterOutputPath") dbutils.widgets.text("LoadType", "Snowflake", "Enter LoadType")
- Step 3: In this step of designing a Data Ingestion Framework using Spark, you can start extracting the file type.
# Getting the Type of file and printing the File name
import pathlib
path = getArgument("InputPath") + "/" + getArgument ("InputFile")
File Extension = pathlib.Path(path). Suffix
print("Processing the", File_Extension, "file")
print("FileName:- ", getArgument("InputFile"))
Output:
Processing the .csv file
FileName: departuredelays.csv
- Step 4: Extract the extension of the file. i.e (.csv) , (.json), (.parquet) from the filename.
#Extracting the extension of the file. i.e (.csv), (.json), (.parquet) 2 filename = path.split("/")(-1].split(".")[-1] 3 filename
Output:
'csv'
- Step 5: Create a function to extract the DataFrame based on the File Format.
#Function to get the DataFrame based on the File Format
def getDf(path, File_Extension) :
if File_Extension == '.csv':
inputDF = (spark.read.option("header", True).csv (path))
elif File_Extension == '.parquet':
inputDF = (spark.read.parquet (path))
elif File_Extension == '.json':
inputDF = (spark.read.json (path))
else:
print("File Format Not Supported")
return inputDF
- Step 6: Call the function and get the required data frame.
#Calling the function and getting the Datafram
df = getDf(path, File_Extension)
display (df)
Output:
- Step 7: You can store and print the schema for better clarity.
# Getting the Schema of the file
inputSchema = df.schema
print("***********Schema of the Source Data***************")
[field for field in inputSchema]
- Step 8: Using the commands given below you can also record the count of the processed file.
# Getting the total record count of Input file
countDF = df.count()
print("Number of records in", getArgument("InputFile"), ": ", countDF)
Output:
- Step 9: Now, you can save this dataframe either in any table or any file system provided that you have valid connection credentials. For this example of designing the Data Ingestion Framework using Spark, the data is being loaded into the Snowflake Cloud Data Warehouse.
loadType = getArgument("LoadType")
# write the dataset to Snowflake table.
if loadType == "Snowflake":
tableName = getArgument ("Targettable")
df.write.format("snowflake").options (**options).option("dbtable",
tableName).mode("overwrite").save() print(tableName, "table loaded
successfully in Snowflake!!!")
elif loadType == "File":
targetPath = getArgument("TargetPath") + "/out_" +
getArgument("InputFile")
# write the dataset to DBFS.
df.write.mode("overwrite").parquet (targetPath)
print("file loaded successfully in DBFS!!!")
else:
print("load not supported")
Output:
- Step 10: You can now build another notebook – Run_Notebook and use the magic function %run to run your Generic_Ingestion_Notebook for different file formats as shown below.
As an example, the following image shows the output if you execute one of the cells. This concludes the process of creating a Data Ingestion Framework using Spark via Databricks.
Conclusion
In this article, you’ve learned how to create a Data Ingestion Framework using Spark through two methods: Web Notebooks like Jupyter for data manipulation with Python or Scala, and Databricks for design. However, both methods require significant engineering resources to develop and maintain pipelines, leading to bottlenecks as business teams wait on data from engineering teams.
To streamline this process, consider Hevo Data, a beginner-friendly, no-code data integration platform. Hevo allows real-time data ingestion from over 100 sources into your chosen Data Warehouse or BI tool without writing any code. With pre-built integrations and support for various file formats, Hevo automates the data integration process for CRMs, HR, and marketing applications, transforming and enriching your data quickly and efficiently.
FAQ on Data Ingestion Framework using Spark
Can Spark be used for data ingestion?
Yes, Spark can be used for data ingestion, ETL (Extract, Transform, Load), and various other data processing tasks.
What is framework for ingesting data?
Apache NiFi, Apache Kafka, AWS Glue
Can Spark be used for ETL?
Yes, Spark be used for ETL (Extract, Transform, Load)
What is the main use of Spark?
1. Batch and Stream Processing
2. Data Analytics
3. Big Data Processing
Is data ingestion same as ETL?
No, data ingestion and ETL are not the same.
Sanchit Agarwal is an Engineer turned Data Analyst with a passion for data, software architecture and AI. He leverages his diverse technical background and 2+ years of experience to write content. He has penned over 200 articles on data integration and infrastructures, driven by a desire to empower data practitioners with practical solutions for their everyday challenges.