ETL (Extract, Transform, Load) is a crucial data integration and analysis process, enabling businesses to gather insights from diverse data sources. With its simplicity and powerful libraries, Python has become a go-to choice for building ETL pipelines. From data extraction using APIs to transforming and loading data into databases, Python offers unmatched flexibility and efficiency.
This blog explores how to set up ETL using Python, highlights key libraries like pandas and Airflow, and discusses the limitations of manual setups. Discover how ETL with Python can streamline your data workflows and optimize your ETL processes.
What is ETL?
ETL (Extract, Transform, Load) is a data integration process used to collect data from multiple sources, transform it into a suitable format, and load it into a target system, such as a data warehouse.
Components of ETL Pipeline
- Source System: The origin of the data, such as databases, APIs, or files.
- Extract: Pulls raw data from the source system.
- Transform: Cleans, processes, and converts data into the required format.
- Load: Loads the transformed data into the target system (e.g., data warehouse).
- Target System: The destination where the data is stored for analysis or reporting.
- Orchestration: Manages the pipeline workflow and schedules tasks.
- Monitoring and Logging: Tracks pipeline performance and logs errors.
Why Should You Use Python for ETL Processes?
- With Python’s programming capabilities, you can create ETL Python pipelines that manage data and transform it according to business requirements.
- Because of Python’s versatility, data engineers and developers can write a program for almost any ETL with Python, including data aggregation. Python can efficiently handle the crucial parts of ETL operations, like indexing dictionaries and data structures.
- You can filter the null values from your data using the pre-built math module in Python. In addition, multiple Python ETL tools are built with externally defined functions and libraries and pure Python codes.
- These tools support other Python libraries for extracting, loading, and transforming data tables from multiple sources into data warehouses.
Leverage Hevo Data’s capability to perform Python transformations during ETL to streamline your workflow and enhance data integration. With Python scripting, simplify complex data processing tasks and customize your ETL pipelines effortlessly. Hevo offers:
Thousands of customers worldwide trust Hevo for their data ingestion needs. Join them and experience seamless data transformation and migration.
Get Started with Hevo at Zero-Cost!
What are the Steps to Use Python Script for ETL?
Step 1: Installing Required Modules
Read about Python to Microsoft SQL Server connection using pyodbc.
The Pip
command can be used to install the required modules. These Modules can be installed by running the following commands in the Command Prompt:
pip install mysql-connector-python
pip install pyodbc
pip install fdb
Step 2: Setting Up ETL Directory
The following files should be created in your project directory in order to set up Python ETL:
- db_credentials.py: This should have all the data required to establish database connections. For example, Database Password, Port Number, etc.
- SQL_queries.py: It should have all the commonly used database queries to extract and load data in String format.
- etl.py: Perform all necessary operations to connect to the database and run the required queries.
- main.py: Responsible for maintaining the flow of the operations and performing the required operations in a specific order.
1. db_credentials.py
This file is required to set up all source and target database connection strings. This file should have all the information necessary to access the appropriate database in a list format to be iterated quickly when required. The format for this file is as follows:
datawarehouse_name = 'your_datawarehouse_name'
# sql-server (target db, datawarehouse)
datawarehouse_db_config = {
'Trusted_Connection': 'yes',
'driver': '{SQL Server}',
'server': 'datawarehouse_sql_server',
'database': '{}'.format(datawarehouse_name),
'user': 'your_db_username',
'password': 'your_db_password',
'autocommit': True,
}
# sql-server (source db)
sqlserver_db_config = [
{
'Trusted_Connection': 'yes',
'driver': '{SQL Server}',
'server': 'your_sql_server',
'database': 'db1',
'user': 'your_db_username',
'password': 'your_db_password',
'autocommit': True,
}
]
# mysql (source db)
mysql_db_config = [
{
'user': 'your_user_1',
'password': 'your_password_1',
'host': 'db_connection_string_1',
'database': 'db_1',
},
{
'user': 'your_user_2',
'password': 'your_password_2',
'host': 'db_connection_string_2',
'database': 'db_2',
},
]
# firebird (source db)
fdb_db_config = [
{
'dsn': "/your/path/to/source.db",
'user': "your_username",
'password': "your_password",
}
]
2. sql_queries.py
This file contains queries that can perform the required operations to extract data from the source databases and load it into the target database while setting up an ETL data pipeline using Python. The format for this file is as follows:
# example queries, will be different across different db platform
firebird_extract = ('''
SELECT fbd_column_1, fbd_column_2, fbd_column_3
FROM fbd_table;
''')
firebird_insert = ('''
INSERT INTO table (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
firebird_extract_2 = ('''
SELECT fbd_column_1, fbd_column_2, fbd_column_3
FROM fbd_table_2;
''')
firebird_insert_2 = ('''
INSERT INTO table_2 (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
sqlserver_extract = ('''
SELECT sqlserver_column_1, sqlserver_column_2, sqlserver_column_3
FROM sqlserver_table
''')
sqlserver_insert = ('''
INSERT INTO table (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
mysql_extract = ('''
SELECT mysql_column_1, mysql_column_2, mysql_column_3
FROM mysql_table
''')
mysql_insert = ('''
INSERT INTO table (column_1, column_2, column_3)
VALUES (?, ?, ?)
''')
# exporting queries
class SqlQuery:
def __init__(self, extract_query, load_query):
self.extract_query = extract_query
self.load_query = load_query
# create instances for SqlQuery class
fbd_query = SqlQuery(firebird_extract, firebird_insert)
fbd_query_2 = SqlQuery(firebird_extract_2, firebird_insert_2)
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)
# store as list for iteration
fbd_queries = [fbdquery, fbd_query_2]
sqlserver_queries = [sqlserver_query]
mysql_queries = [mysql_query]
Integrate Salesforce to BigQuery
Integrate HubSpot to Databricks
Integrate Amazon DocumentDB to Snowflake
3. etl.py
This file should contain all the code that helps establish connections among the correct databases and run the required queries to set up ETL with Python. The format for this file is as follows:
# python modules
import mysql.connector
import pyodbc
import fdb
def etl(query, source_cnx, target_cnx):
# extract data from source db
source_cursor = source_cnx.cursor()
source_cursor.execute(query.extract_query)
data = source_cursor.fetchall()
source_cursor.close()
# load data into warehouse db
if data:
target_cursor = target_cnx.cursor()
target_cursor.execute("USE {}".format(datawarehouse_name))
target_cursor.executemany(query.load_query, data)
print('data loaded to warehouse db')
target_cursor.close()
else:
print('data is empty')
def etl_process(queries, target_cnx, source_db_config, db_platform):
# establish source db connection
if db_platform == 'mysql':
source_cnx = mysql.connector.connect(**source_db_config)
elif db_platform == 'sqlserver':
source_cnx = pyodbc.connect(**source_db_config)
elif db_platform == 'firebird':
source_cnx = fdb.connect(**source_db_config)
else:
return 'Error! unrecognised db platform'
# loop through sql queries
for query in queries:
etl(query, source_cnx, target_cnx)
# close the source db connection
source_cnx.close()
4. main.py
This code in this file is responsible for iterating through credentials to connect with the database and perform the required operations. The format for this file is as follows:
# variables
from db_credentials import datawarehouse_db_config, sqlserver_db_config, mysql_db_config, fbd_db_config
from sql_queries import fbd_queries, sqlserver_queries, mysql_queries
# methods
from etl import etl_process
def main():
print('starting etl')
# establish connection for target database (sql-server)
target_cnx = pyodbc.connect(**datawarehouse_db_config)
# loop through credentials
# mysql
for config in mysql_db_config:
try:
print("loading db: " + config['database'])
etl_process(mysql_queries, target_cnx, config, 'mysql')
except Exception as error:
print("etl for {} has error".format(config['database']))
print('error message: {}'.format(error))
continue
# sql-server
for config in sqlserver_db_config:
try:
print("loading db: " + config['database'])
etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
except Exception as error:
print("etl for {} has error".format(config['database']))
print('error message: {}'.format(error))
continue
# firebird
for config in fbd_db_config:
try:
print("loading db: " + config['database'])
etl_process(fbd_queries, target_cnx, config, 'firebird')
except Exception as error:
print("etl for {} has error".format(config['database']))
print('error message: {}'.format(error))
continue
target_cnx.close()
if __name__ == "__main__":
main()
What Is an Example of Python ETL?
Step 1: Import the modules and functions
In this Python ETL example, you must first import the required modules and functions.
import glob
import pandas as pd
import xml.etree.ElementTree as ET
from datetime import datetime
- The dealership_data file contains CSV, JSON, and XML files for used car data. The features incorporated here are car_model, year_of_manufacture, price, and fuel. So, you need to extract the file from the raw data, transform it into a target file, and then load it into the output.
- Download the source file:
!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip
nzip datasource.zip -d dealership_data
- Setting the path for Target files:
tmpfile = "dealership_temp.tmp" # store all extracted data
logfile = "dealership_logfile.txt" # all event logs will be stored
targetfile = "dealership_transformed_data.csv" # transformed data is stored
Step 2: Extract
The Extract
function in this Python ETL example extracts massive data in batches. This data is extracted from numerous sources.
def extract_from_csv(file_to_process):
dataframe = pd.read_csv(file_to_process)
return dataframe
def extract_from_json(file_to_process):
dataframe = pd.read_json(file_to_process,lines=True)
return dataframe
def extract_from_xml(file_to_process):
dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])
tree = ET.parse(file_to_process)
root = tree.getroot()
for person in root:
car_model = person.find("car_model").text
year_of_manufacture = int(person.find("year_of_manufacture").text)
price = float(person.find("price").text)
fuel = person.find("fuel").text
dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True)
return dataframe
- Calling Extract Function()
def extract():
extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])
#for csv files
for csvfile in glob.glob("dealership_data/*.csv"):
extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
#for json files
for jsonfile in glob.glob("dealership_data/*.json"):
extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
#for xml files
for xmlfile in glob.glob("dealership_data/*.xml"):
extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)
return extracted_data
Step 3: Transform
Using the transform
function, you can convert the data in any format that suits your needs.
def transform(data):
data['price'] = round(data.price, 2)
return data
Step 4: Loading and Logging
In this step, the data is loaded to the destination file. A logging entry needs to be established before loading. Usually, the data can be loaded into:
- Data warehouses
- Databases
- Flat files
- Data lakes
A logging entry needs to be established before loading.
load function()
def load(targetfile,data_to_load):
data_to_load.to_csv(targetfile)
log function()
def log(message):
timestamp_format = '%H:%M:%S-%h-%d-%Y'
#Hour-Minute-Second-MonthName-Day-Year
now = datetime.now() # get current timestamp
timestamp = now.strftime(timestamp_format)
with open("dealership_logfile.txt","a") as f: f.write(timestamp + ',' + message + 'n')
Step 5: Running ETL Process
The log indicates that you have started the ETL Python process.
log("ETL Job Started")
The log indicates that you have started and ended the Extract phase.
log("Extract phase Started")
extracted_data = extract()
log("Extract phase Ended")
The log indicates that you have started and ended the Transform phase.
log(“Transform phase Started”)
transformed_data = transform(extracted_data)
log("Transform phase Ended")
The log indicates that you have started and ended the Load phase.
log("Load phase Started")
load(targetfile,transformed_data)
log("Load phase Ended")
The log indicates that the ETL Python process has ended.
log("ETL Job Ended")
Streamline ETL in minutes!
No credit card required
Real-World Use Cases of Python ETL
- Customer 360-Degree View: By integrating data from CRMs (e.g., Salesforce), marketing tools, and transactional databases, Python ETL pipelines create unified datasets to provide a complete view of customer behavior.
- Real-Time Analytics: Python, combined with Kafka or Spark Streaming, powers real-time ETL pipelines for processing transactional or event-driven data to enable instant reporting and analytics.
- E-Commerce Inventory Management: Python ETL scripts integrate data from inventory systems, supplier APIs, and sales platforms to maintain up-to-date inventory records and optimize stock levels.
- Financial Data Reporting: Python ETL aggregates data from multiple sources like payment gateways, bank statements, and accounting tools for generating financial reports and ensuring compliance.
- Healthcare Data Integration: Python ETL pipelines help combine patient records from different systems, transforming them into standardized formats for research, reporting, or interoperability.
What Tools and Libraries Can You Use for Python ETL?
- pandas: A powerful library for data manipulation and analysis in Python, commonly used during the transform stage of ETL processes to clean, restructure, and analyze data efficiently.
- sqlalchemy: A SQL toolkit and Object Relational Mapper (ORM) that enables seamless interaction with SQL databases, widely used for extracting and loading data in Python-based ETL pipelines.
- pyodbc: A Python library that allows easy connection to various databases, including SQL Server, making it a popular choice for extracting and loading data in ETL workflows.
- pyspark: A library for distributed data processing, ideal for handling large-scale datasets in parallel ETL pipelines, commonly used in big data processing environments.
- boto3: The AWS SDK for Python, providing simple methods for interacting with AWS services, especially Amazon S3, during the extract or load phases of ETL processes.
What are the Alternative Programming Languages for ETL?
The languages are as follows:
1) Java
Java is a popular programming language, particularly for developing client-server web applications. Java has influenced other programming languages, including Python, and spawned several branches, including Scala. Java is the foundation for several other big data tools, including Hadoop and Spark. The Java ecosystem also includes a library collection comparable to that of Python.
This programming language is designed so developers can write code anywhere and run it anywhere, regardless of the underlying computer architecture. It is also known as write once, run anywhere (WORA).
2) Ruby
Ruby, like Python, is a scripting language that allows developers to create ETL pipelines, but few ETL-specific Ruby frameworks are available to make the task easier. However, several libraries are currently developing, including Nokogiri, Kiba, and Square’s ETL package.
3) Go
Go, also known as Golang, is a programming language that is similar to C and is intended for data analysis and big data applications. It was created to fill C++ and Java gaps discovered while working with Google’s servers and distributed systems. Go includes several machine learning libraries, including support for Google’s TensorFlow, data pipeline libraries such as Apache Beam, and two ETL toolkits, Crunch and Pachyderm.
Conclusion
This article provided information on Python, its key features, different methods to set up ETL using Python script, limitations of manually setting up ETL using Python, top Python libraries to set up ETL pipeline, and the top ETL using Python tools.
If you’re looking for a no-code solution to simplify your ETL process, Hevo is an excellent choice. Sign up for a 14-day free trial and simplify your data integration process. Check out the pricing details to understand which plan fulfills all your business needs.
Frequently Asked Questions
1. Can Python be used for ETL?
Yes, Python can be used for ETL (Extract, Transform, Load) processes.
2. What is an example of real time ETL?
Real-time ETL involves continuously extracting, transforming, and loading data as soon as it is available rather than processing it in batches. This is critical for applications that require up-to-the-minute data, such as fraud detection, real-time analytics, and monitoring systems. Example of Real-Time ETL: Fraud Detection in Financial Transactions
3. Which is the best ETL tool?
The “best” ETL tool depends on your specific requirements, such as the complexity of your data workflows, your budget, your team’s expertise, and the systems you need to integrate. Here are some of the top ETL tools: Hevo, Apache NiFi, Informatica, Fivetran, etc.
4. Is SQL an ETL tool?
No, SQL (Structured Query Language) itself is not an ETL tool, but it is a powerful language used within ETL processes. SQL is primarily used for querying and manipulating data within relational databases.
Manik is a passionate data enthusiast with extensive experience in data engineering and infrastructure. He excels in writing highly technical content, drawing from his background in data science and big data. Manik's problem-solving skills and analytical thinking drive him to create impactful content for data professionals, helping them navigate their day-to-day challenges. He holds a Bachelor's degree in Computers and Communication, with a minor in Big Data, from Manipal Institute of Technology.