In this article, a simple Apache Spark ETL is built using Pyspark to load JSON data into a PostgreSQL database. Here is an outline of what is covered in this blog:
Hevo, A Simpler Alternative to Integrate your Data for Analysis
Hevo offers a faster way to move data from databases or SaaS applications into your data warehouse to be visualized in a BI tool. Hevo is fully automated and hence does not require you to code.
Check out some of the cool features of Hevo:
- Completely Automated: The Hevo platform can be set up in just a few minutes and requires minimal maintenance.
- Real-time Data Transfer: Hevo provides real-time data migration, so you can have analysis-ready data always.
- 100% Complete & Accurate Data Transfer: Hevo’s robust infrastructure ensures reliable data transfer with zero data loss.
- Scalable Infrastructure: Hevo has in-built integrations for 100+ sources that can help you scale your data infrastructure as required.
- 24/7 Live Support: The Hevo team is available round the clock to extend exceptional support to you through chat, email, and support call.
- Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
- Live Monitoring: Hevo allows you to monitor the data flow so you can check where your data is at a particular point in time.
You can try Hevo for free by signing up for a 14-day free trial.
Introduction to Spark
Spark is an open-source analytics and data processing engine used to work with large scale, distributed datasets. Spark supports Java, Scala, R, and Python. It is used by data scientists and developers to rapidly perform ETL jobs on large scale data from IoT devices, sensors, etc. Spark also has a Python DataFrame API that can read a JSON file into a DataFrame automatically inferring the schema.
Spark can run on Hadoop, EC2, Kubernetes, or on the cloud, or using its standalone cluster mode. Spark has libraries like SQL and DataFrames, GraphX, Spark Streaming, and MLib which can be combined in the same application.
Introduction to ETL
ETL refers to the transfer and transformation of data from one system to another using data pipelines. Data is extracted from a source, or from multiple sources, often to move it to a unified platform such as a data lake or a data warehouse to deliver analytics and business intelligence.
Data existing in several sources usually needs to be prepared before loading it to the destination to get it into the right format suiting the organization’s business needs.
Extraction refers to the extraction of data from the source(s). This is considered to be the most important part of the ETL process because it sets the stage for success for the following stages. A data warehouse has data in many different formats (such as parquet, JSON, CSV, TSV, XML, etc.), and from a wide variety of sources (SaaS tools, advertising platforms, FlatFiles, NoSQL databases, RDBMS, etc.).
Transformation is the stage that ensures that data is ‘clean’ and is in the correct format. When data is extracted from disparate systems, it is necessary to ensure that they interact properly. Cleaning is a very important part of the transformation. Transformation is dependent on specific business needs. Sometimes, only some of the columns are relevant and other times, tables or columns might have to be joined together. Duplicate rows might have to be eliminated, and null columns may have to be dealt with. All of the above and many other modifications of data are handled during the transformation stage.
Loading deals with moving the data into the final target. This doesn’t necessarily have to be a data lake or a warehouse. It can even be a flat-file. Where the data is loaded and how often depends on the business needs. Data can be loaded hourly, daily, weekly, or monthly.
In this article let us perform a simple Apache Spark ETL to load a JSON file into a PostgreSQL database. We will be using PySpark- Spark’s Python API to do this. Although the native Scala language is faster, most are more comfortable with Python. We will be using Jupyter Notebook to code.
Step 1: Extraction
To get PySpark working, you need to use the findSpark package. SparkContext is the object that manages the cluster connections. It connects to the cluster managers which in turn run the tasks. SparkContext object reads data into an RDD (Spark’s core data structure).
Spark has the ability to read JSON files, inferring the schema automatically, and load it as a DataFrame. It does this using Spark SQL. So instead of using the SparkContext object, let us use the SparkSQL object.
Here we import SQLContext, instantiate the SQLContext object, and then use it to read the valid JSON file, assigning it to a DataFrame object.
Step 2: Transformation
Transformation involves several processes whose purpose is to clean and format the data to suit the needs of the business. You can remove missing data, duplicate data, join columns to create new columns, filter out rows, etc.
For example, you might filter out the rows to get the data of just the adults (ages 18 and above). If you don’t need to make any changes, you don’t have to transform.
Step 3: Loading
We extracted and transformed our data from a JSON file. Now, let us load the pop_data DataFrame into the destination. We will load it into the local PostgreSQL database so that it can be analyzed.
Psycopg2 is an open-source Python library that is widely used to communicate with the PostgreSQL server. The psycopg2.connect function is used to connect to the database. Once the connection is established, a ‘cursor’ is used to execute commands.
First, we create a table in PostgreSQL that will be populated later with the data. Using an INSERT statement we input the data as a list of tuples.
When we query the database, using the SELECT statement, it can be seen that the relevant output is produced. PostgreSQL uses transactions to ensure that all the queries in a transaction block are executed in one go. So the connection.commit() method is used to commit the changes.
This article outlined a basic Apache Spark ETL process using PySpark from a single source to a database. However, in reality, you will be dealing with multiple disparate sources. You will need several complex transformations performed on the fly. You might have to load data into a cloud data warehouse. All of this can get very complicated. It requires a lot of expertise at every level of the process. But Hevo Data can guarantee you smooth storage and processing.
Hevo is a No-code data pipeline. It has pre-built integrations with 100+ sources. You can connect your SaaS platforms, databases, etc. to any data warehouse of your choice, without writing any code or worrying about maintenance. If you are interested, you can try Hevo by signing up for the 14-day free trial.
Share your thoughts on Apache Spark ETL in the comments below!