ETL (Extract, Transform, Load) is a crucial process for data integration and analysis, enabling businesses to gather insights from diverse data sources. Python, with its simplicity and powerful libraries, has become a go-to choice for building ETL pipelines. From data extraction using APIs to transforming and loading data into databases, Python offers unmatched flexibility and efficiency.
This blog explores how to set up ETL pipelines using Python, highlights key libraries like pandas and Airflow, and discusses the limitations of manual setups. Discover how ETL Python can streamline your data workflows and optimize your ETL processes.
What is Python?
- Python is an Interactive, Interpreted, Object-Oriented programming language that incorporates Exceptions, Modules, Dynamic Typing, Dynamic Binding, Classes, High-level Dynamic Data Types, etc.
- It can also be used to make system calls to almost all well-known Operating Systems.
What is ETL?
- ETL stands for Extract, Transform and Load. It can be defined as the process that allows businesses to create a Single Source of Truth for all Online Analytical Processing.
- Businesses use multiple platforms to perform their day-to-day operations.
What are the Steps to Setup ETL Using Python Script?
Leverage Hevo Data’s capability to perform Python transformations during ETL to streamline your workflow and enhance data integration. With Python scripting, simplify complex data processing tasks and customize your ETL pipelines effortlessly. Hevo offers:
Thousands of customers worldwide trust Hevo for their data ingestion needs. Join them and experience seamless data transformation and migration.
Get Started with Hevo at Zero-Cost!
Step 1: Installing Required Modules
Read about python to Microsoft SQL Server connection using pyodbc.
The Pip command can be used to install the required modules. These Modules can be installed by running the following commands in the Command Prompt:
pip install mysql-connector-python
pip install pyodbc
pip install fdb
Step 2: Setting Up ETL Directory
The following files should be created in your project directory in order to set up ETL Using Python:
- db_credentials.py: Should have all data that is required to establish connections with all databases. For example, Database Password, Port Number, etc.
- sql_queries.py: Should have all the commonly used database queries to extract, and load data in String format.
- etl.py: Perform all necessary operations to connect to the database and run the required queries.
- main.py: Responsible for maintaining the flow of the operations and performing the necessary operations in a specific order.
Step 2A – db_credentials.py
This file is required for setting up all Source and Target Database Connection Strings in the process to set up ETL using Python. This file should have all the required information to access the appropriate database in a list format so that it can be iterated easily when required.
The format for this file is as follows:
datawarehouse_name = 'your_datawarehouse_name'
# sql-server (target db, datawarehouse)
datawarehouse_db_config = {
'Trusted_Connection': 'yes',
'driver': '{SQL Server}',
'server': 'datawarehouse_sql_server',
'database': '{}'.format(datawarehouse_name),
'user': 'your_db_username',
'password': 'your_db_password',
'autocommit': True,
}
# sql-server (source db)
sqlserver_db_config = [
{
'Trusted_Connection': 'yes',
'driver': '{SQL Server}',
'server': 'your_sql_server',
'database': 'db1',
'user': 'your_db_username',
'password': 'your_db_password',
'autocommit': True,
}
]
# mysql (source db)
mysql_db_config = [
{
'user': 'your_user_1',
'password': 'your_password_1',
'host': 'db_connection_string_1',
'database': 'db_1',
},
{
'user': 'your_user_2',
'password': 'your_password_2',
'host': 'db_connection_string_2',
'database': 'db_2',
},
]
# firebird (source db)
fdb_db_config = [
{
'dsn': "/your/path/to/source.db",
'user': "your_username",
'password': "your_password",
}
]
Step 2B – sql_queries.py
This file contains queries that can be used to perform the required operations to extract data from the Source Databases and load it into the Target Database in the process to set up ETL data pipeline using Python.
The format for this file is as follows:
# example queries, will be different across different db platform
firebird_extract = ('''
SELECT fbd_column_1, fbd_column_2, fbd_column_3
FROM fbd_table;
''')
firebird_insert = ('''
INSERT INTO table (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
firebird_extract_2 = ('''
SELECT fbd_column_1, fbd_column_2, fbd_column_3
FROM fbd_table_2;
''')
firebird_insert_2 = ('''
INSERT INTO table_2 (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
sqlserver_extract = ('''
SELECT sqlserver_column_1, sqlserver_column_2, sqlserver_column_3
FROM sqlserver_table
''')
sqlserver_insert = ('''
INSERT INTO table (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
mysql_extract = ('''
SELECT mysql_column_1, mysql_column_2, mysql_column_3
FROM mysql_table
''')
mysql_insert = ('''
INSERT INTO table (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
# exporting queries
class SqlQuery:
def __init__(self, extract_query, load_query):
self.extract_query = extract_query
self.load_query = load_query
# create instances for SqlQuery class
fbd_query = SqlQuery(firebird_extract, firebird_insert)
fbd_query_2 = SqlQuery(firebird_extract_2, firebird_insert_2)
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# store as list for iteration
fbd_queries = [fbdquery, fbd_query_2]
sqlserver_queries = [sqlserver_query]
mysql_queries = [mysql_query]
Integrate Salesforce to BigQuery
Integrate HubSpot to Databricks
Integrate Amazon DocumentDB to Snowflake
Step 2C – etl.py
This file should contain all the code that helps establish connections among the correct databases and run the required queries in order to set up ETL using Python.
The format for this file is as follows:
# python modules
import mysql.connector
import pyodbc
import fdb
def etl(query, source_cnx, target_cnx):
# extract data from source db
source_cursor = source_cnx.cursor()
source_cursor.execute(query.extract_query)
data = source_cursor.fetchall()
source_cursor.close()
# load data into warehouse db
if data:
target_cursor = target_cnx.cursor()
target_cursor.execute("USE {}".format(datawarehouse_name))
target_cursor.executemany(query.load_query, data)
print('data loaded to warehouse db')
target_cursor.close()
else:
print('data is empty')
def etl_process(queries, target_cnx, source_db_config, db_platform):
# establish source db connection
if db_platform == 'mysql':
source_cnx = mysql.connector.connect(**source_db_config)
elif db_platform == 'sqlserver':
source_cnx = pyodbc.connect(**source_db_config)
elif db_platform == 'firebird':
source_cnx = fdb.connect(**source_db_config)
else:
return 'Error! unrecognised db platform'
# loop through sql queries
for query in queries:
etl(query, source_cnx, target_cnx)
# close the source db connection
source_cnx.close()
Step 2D – main.py
This code in this file is responsible for iterating through credentials to connect with the database and perform the required ETL Using Python operations.
The format for this file is as follows:
# variables
from db_credentials import datawarehouse_db_config, sqlserver_db_config, mysql_db_config, fbd_db_config
from sql_queries import fbd_queries, sqlserver_queries, mysql_queries
# methods
from etl import etl_process
def main():
print('starting etl')
# establish connection for target database (sql-server)
target_cnx = pyodbc.connect(**datawarehouse_db_config)
# loop through credentials
# mysql
for config in mysql_db_config:
try:
print("loading db: " + config['database'])
etl_process(mysql_queries, target_cnx, config, 'mysql')
except Exception as error:
print("etl for {} has error".format(config['database']))
print('error message: {}'.format(error))
continue
# sql-server
for config in sqlserver_db_config:
try:
print("loading db: " + config['database'])
etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
except Exception as error:
print("etl for {} has error".format(config['database']))
print('error message: {}'.format(error))
continue
# firebird
for config in fbd_db_config:
try:
print("loading db: " + config['database'])
etl_process(fbd_queries, target_cnx, config, 'firebird')
except Exception as error:
print("etl for {} has error".format(config['database']))
print('error message: {}'.format(error))
continue
target_cnx.close()
if __name__ == "__main__":
main()
What Is an Example of Python ETL?
Step 1: Import the modules and functions
In this Python ETL example, first, you need to import the required modules and functions.
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime
- The dealership_data file contains CSV, JSON, and XML files for used car data. The features incorporated here are car_model, year_of_manufacture, price, and fuel. So you need to extract the file from the raw data, transform it into a target file, and then load it into the output.
Download the source file:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
Extracting the zip file:
nzip datasource.zip -d dealership_data
Setting the path for Target files:
tmpfile = "dealership_temp.tmp" # store all extracted data
logfile = "dealership_logfile.txt" # all event logs will be stored
targetfile = "dealership_transformed_data.csv" # transformed data is stored
Step 2: Extract
The Extract
function in this Python ETL example is used to extract a huge amount of data in batches. This data is extracted from numerous sources.
CSV Extract Function:
def extract_from_csv(file_to_process):
dataframe = pd.read_csv(file_to_process)
return dataframe
JSON Extract Function
def extract_from_json(file_to_process):
dataframe = pd.read_json(file_to_process,lines=True)
return dataframe
XML Extract Function
def extract_from_xml(file_to_process):
dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])
tree = ET.parse(file_to_process)
root = tree.getroot()
for person in root:
car_model = person.find("car_model").text
year_of_manufacture = int(person.find("year_of_manufacture").text)
price = float(person.find("price").text)
fuel = person.find("fuel").text
dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True)
return dataframe
Calling Extract Function()
def extract():
extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])
#for csv files
for csvfile in glob.glob("dealership_data/*.csv"):
extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
#for json files
for jsonfile in glob.glob("dealership_data/*.json"):
extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
#for xml files
for xmlfile in glob.glob("dealership_data/*.xml"):
extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)
return extracted_data
Step 3: Transform
Using the transform
function you can convert the data in any format as per your needs.
def transform(data):
data['price'] = round(data.price, 2)
return data
Step 4: Loading and Logging
In this step, the data is loaded to the destination file. A logging entry needs to be established before loading. Usually, the data can be loaded into:
- Data warehouses
- Databases
- Flat files
- Data lakes
A logging entry needs to be established before loading.
load function()
def load(targetfile,data_to_load):
data_to_load.to_csv(targetfile)
log function()
def log(message):
timestamp_format = '%H:%M:%S-%h-%d-%Y'
#Hour-Minute-Second-MonthName-Day-Year
now = datetime.now() # get current timestamp
timestamp = now.strftime(timestamp_format)
with open("dealership_logfile.txt","a") as f: f.write(timestamp + ',' + message + 'n')
Step 5: Running ETL Process
The log indicates that you have started the ETL Python process.
log("ETL Job Started")
The log indicates that you have started and ended the Extract phase.
log("Extract phase Started")
extracted_data = extract()
log("Extract phase Ended")
The log indicates that you have started and ended the Transform phase.
log(“Transform phase Started”)
transformed_data = transform(extracted_data)
log("Transform phase Ended")
The log indicates that you have started and ended the Load phase.
log("Load phase Started")
load(targetfile,transformed_data)
log("Load phase Ended")
The log indicates that the ETL Python process has ended.
log("ETL Job Ended")
Streamline ETL in minutes!
No credit card required
Why Should You Use Python for ETL Processes?
- With Python’s programming capabilities, you can create ETL Python pipelines that manage data and transform it in accordance with business requirements.
- Because of Python’s versatility, data engineers and developers can write a program for almost any ETL python process, including data aggregation. Python can efficiently handle the crucial parts of ETL operations, like indexing dictionaries and data structures.
- You can filter the null values from your data using the pre-built math module in Python. In addition, multiple Python ETL tools are built with externally defined functions and libraries and pure Python codes.
- These tools support other Python libraries for extracting, loading, and transforming tables of data from multiple sources into data warehouses.
How to Streamline Your ETL Process with These Python Libraries
1. pandas
Used for data manipulation and analysis in the ETL python process, especially during the transform stage.
import pandas as pd
# Load data from CSV
df = pd.read_csv('data.csv')
# Transform data
df['new_column'] = df['existing_column'] * 2
# Save transformed data
df.to_csv('transformed_data.csv', index=False)
2. sqlalchemy
Facilitates interaction with SQL databases, commonly used for extracting and loading data.
from sqlalchemy import create_engine
# Create a database connection
engine = create_engine('mysql+pymysql://user:password@localhost/db_name')
# Load data into a pandas DataFrame
df = pd.read_sql('SELECT * FROM table_name', con=engine)
# Save DataFrame to a database table
df.to_sql('new_table', con=engine, if_exists='replace', index=False)
3. pyodbc
A popular library for connecting to various databases like SQL Server.
import pyodbc
# Connect to a SQL Server database
conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER=server_name;DATABASE=db_name;UID=user;PWD=password')
# Extract data
cursor = conn.cursor()
cursor.execute('SELECT * FROM table_name')
rows = cursor.fetchall()
for row in rows:
print(row)
4. pyspark
Used for processing large-scale data in distributed ETL pipelines.
from pyspark.sql import SparkSession
# Initialize Spark session
spark = SparkSession.builder.appName('ETL Pipeline').getOrCreate()
# Load data from a CSV file
df = spark.read.csv('data.csv', header=True, inferSchema=True)
# Transform data
df = df.withColumn('new_column', df['existing_column'] * 2)
# Save transformed data
df.write.csv('transformed_data.csv', header=True)
5. boto3
For interacting with AWS services, especially S3 during the extract or load phase.
import boto3
# Initialize S3 client
s3 = boto3.client('s3')
# Upload a file to S3
s3.upload_file('local_file.csv', 'my_bucket', 's3_file.csv')
# Download a file from S3
s3.download_file('my_bucket', 's3_file.csv', 'local_file.csv')
What are the Alternative Programming Languages for ETL?
The languages are as follows:
1) Java
- Java is a popular programming language, particularly for developing client-server web applications. Java has influenced other programming languages, including Python, and has spawned a number of branches, including Scala.
- Java serves as the foundation for several other big data tools, including Hadoop and Spark. The Java ecosystem also includes a library collection comparable to that of Python.
- This programming language is designed in such a way that developers can write code anywhere and run it anywhere, regardless of the underlying computer architecture. It is also known as write once, run anywhere (WORA).
2) Ruby
- Ruby, like Python, is a scripting language that allows developers to create ETL pipelines, but there are few ETL-specific Ruby frameworks available to make the task easier.
- However, several libraries are currently in development, including Nokogiri, Kiba, and Square’s ETL package.
3) Go
- Go, also known as Golang, is a programming language that is similar to C and is intended for data analysis and big data applications.
- It was created to fill C++ and Java gaps discovered while working with Google’s servers and distributed systems.
- Go includes several machine learning libraries, including support for Google’s TensorFlow, data pipeline libraries such as Apache Beam, and two ETL toolkits, Crunch and Pachyderm.
Conclusion
This article provided information on Python, its key features, Python, different methods to set up ETL using Python Script, limitations of manually setting up ETL using Python, top python libraries to set up ETL pipeline and the top ETL using Python tools.
If you’re looking for a no-code solution to simplify your ETL process, Hevo is an excellent choice. Sign up for a 14-day free trial and simplify your data integration process. Check out the pricing details to understand which plan fulfills all your business needs.
FAQ on Setup ETL Using Python Script
1. Can Python be used for ETL?
Yes, Python can be used for ETL (Extract, Transform, Load) processes.
2. What is an example of real time ETL?
Real-time ETL involves continuously extracting, transforming, and loading data as soon as it is available, rather than processing it in batches. This is critical for applications that require up-to-the-minute data, such as fraud detection, real-time analytics, and monitoring systems. Example of Real-Time ETL: Fraud Detection in Financial Transactions
3. Which is the best ETL tool?
The “best” ETL tool depends on your specific requirements, such as the complexity of your data workflows, your budget, your team’s expertise, and the systems you need to integrate. Here are some of the top ETL tools: Hevo, Apache NiFi, Informatica, Fivetran etc.
4. Is SQL an ETL tool?
No, SQL (Structured Query Language) itself is not an ETL tool, but it is a powerful language used within ETL processes. SQL is primarily used for querying and manipulating data within relational databases.
Manik is a passionate data enthusiast with extensive experience in data engineering and infrastructure. He excels in writing highly technical content, drawing from his background in data science and big data. Manik's problem-solving skills and analytical thinking drive him to create impactful content for data professionals, helping them navigate their day-to-day challenges. He holds a Bachelor's degree in Computers and Communication, with a minor in Big Data, from Manipal Institute of Technology.