In this article, you will gain information about setting up ETL using Python.

You will also gain a holistic understanding of Python, its key features, Python, different methods to set up ETL using Python Script, limitations of manually setting up ETL using Python

Read along to find out in-depth information about setting up ETL Automation using Python.

What is Python?

  • Python is an Interactive, Interpreted, Object-Oriented programming language that incorporates Exceptions, Modules, Dynamic Typing, Dynamic Binding, Classes, High-level Dynamic Data Types, etc.
  • It can also be used to make system calls to almost all well-known Operating Systems.

What is ETL?

  • ETL stands for Extract, Transform and Load. It can be defined as the process that allows businesses to create a Single Source of Truth for all Online Analytical Processing.
  • Businesses use multiple platforms to perform their day-to-day operations.

Steps to Setup ETL Using Python Script

Step 1: Installing Required Modules

The Pip command can be used to install the required modules. These Modules can be installed by running the following commands in the Command Prompt:

pip install mysql-connector-python
pip install pyodbc
pip install fdb

Step 2: Setting Up ETL Directory

The following files should be created in your project directory in order to set up ETL Using Python:

  • db_credentials.py: Should have all data that is required to establish connections with all databases. For example, Database Password, Port Number, etc.
  • sql_queries.py: Should have all the commonly used database queries to extract, and load data in String format.
  • etl.py: Perform all necessary operations to connect to the database and run the required queries.
  • main.py: Responsible for maintaining the flow of the operations and performing the necessary operations in a specific order.

Step 2A – db_credentials.py

This file is required for setting up all Source and Target Database Connection Strings in the process to set up ETL using Python. This file should have all the required information to access the appropriate database in a list format so that it can be iterated easily when required.

The format for this file is as follows:

datawarehouse_name = 'your_datawarehouse_name'

# sql-server (target db, datawarehouse)
datawarehouse_db_config = {
  'Trusted_Connection': 'yes',
  'driver': '{SQL Server}',
  'server': 'datawarehouse_sql_server',
  'database': '{}'.format(datawarehouse_name),
  'user': 'your_db_username',
  'password': 'your_db_password',
  'autocommit': True,
}

# sql-server (source db)
sqlserver_db_config = [
  {
    'Trusted_Connection': 'yes',
    'driver': '{SQL Server}',
    'server': 'your_sql_server',
    'database': 'db1',
    'user': 'your_db_username',
    'password': 'your_db_password',
    'autocommit': True,
  }
]

# mysql (source db)
mysql_db_config = [
  {
    'user': 'your_user_1',
    'password': 'your_password_1',
    'host': 'db_connection_string_1',
    'database': 'db_1',
  },
  {
    'user': 'your_user_2',
    'password': 'your_password_2',
    'host': 'db_connection_string_2',
    'database': 'db_2',
  },
]

# firebird (source db)
fdb_db_config = [
  {
    'dsn': "/your/path/to/source.db",
    'user': "your_username",
    'password': "your_password",
  }
]

Step 2B – sql_queries.py

This file contains queries that can be used to perform the required operations to extract data from the Source Databases and load it into the Target Database in the process to set up ETL using Python.

The format for this file is as follows:

# example queries, will be different across different db platform
firebird_extract = ('''
  SELECT fbd_column_1, fbd_column_2, fbd_column_3
  FROM fbd_table;
''')

firebird_insert = ('''
  INSERT INTO table (column_1, column_2, column_3)
  VALUES (?, ?, ?)  
''')

firebird_extract_2 = ('''
  SELECT fbd_column_1, fbd_column_2, fbd_column_3
  FROM fbd_table_2;
''')

firebird_insert_2 = ('''
  INSERT INTO table_2 (column_1, column_2, column_3)
  VALUES (?, ?, ?)  
''')

sqlserver_extract = ('''
  SELECT sqlserver_column_1, sqlserver_column_2, sqlserver_column_3
  FROM sqlserver_table
''')

sqlserver_insert = ('''
  INSERT INTO table (column_1, column_2, column_3)
  VALUES (?, ?, ?)  
''')

mysql_extract = ('''
  SELECT mysql_column_1, mysql_column_2, mysql_column_3
  FROM mysql_table
''')

mysql_insert = ('''
  INSERT INTO table (column_1, column_2, column_3)
  VALUES (?, ?, ?)  
''')

# exporting queries
class SqlQuery:
  def __init__(self, extract_query, load_query):
    self.extract_query = extract_query
    self.load_query = load_query
    
# create instances for SqlQuery class
fbd_query = SqlQuery(firebird_extract, firebird_insert)
fbd_query_2 = SqlQuery(firebird_extract_2, firebird_insert_2)
sqlserver_query = SqlQuery(sqlserver_extract, sqlserver_insert)
mysql_query = SqlQuery(mysql_extract, mysql_insert)

# store as list for iteration
fbd_queries = [fbdquery, fbd_query_2]
sqlserver_queries = [sqlserver_query]
mysql_queries = [mysql_query]
Integrate Salesforce to BigQuery
Integrate HubSpot to Databricks

Step 2C – etl.py

This file should contain all the code that helps establish connections among the correct databases and run the required queries in order to set up ETL using Python.

The format for this file is as follows:

# python modules
import mysql.connector
import pyodbc
import fdb
def etl(query, source_cnx, target_cnx):
  # extract data from source db
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query)
  data = source_cursor.fetchall()
  source_cursor.close()

  # load data into warehouse db
  if data:
    target_cursor = target_cnx.cursor()
    target_cursor.execute("USE {}".format(datawarehouse_name))
    target_cursor.executemany(query.load_query, data)
    print('data loaded to warehouse db')
    target_cursor.close()
  else:
    print('data is empty')

def etl_process(queries, target_cnx, source_db_config, db_platform):
  # establish source db connection
  if db_platform == 'mysql':
    source_cnx = mysql.connector.connect(**source_db_config)
  elif db_platform == 'sqlserver':
    source_cnx = pyodbc.connect(**source_db_config)
  elif db_platform == 'firebird':
    source_cnx = fdb.connect(**source_db_config)
  else:
    return 'Error! unrecognised db platform'
  
  # loop through sql queries
  for query in queries:
    etl(query, source_cnx, target_cnx)
    
  # close the source db connection
  source_cnx.close()

Step 2D – main.py

This code in this file is responsible for iterating through credentials to connect with the database and perform the required ETL Using Python operations.

The format for this file is as follows:

# variables
from db_credentials import datawarehouse_db_config, sqlserver_db_config, mysql_db_config, fbd_db_config
from sql_queries import fbd_queries, sqlserver_queries, mysql_queries

# methods
from etl import etl_process

def main():
  print('starting etl')
	
  # establish connection for target database (sql-server)
  target_cnx = pyodbc.connect(**datawarehouse_db_config)
	
  # loop through credentials

  # mysql
  for config in mysql_db_config: 
    try:
      print("loading db: " + config['database'])
      etl_process(mysql_queries, target_cnx, config, 'mysql')
    except Exception as error:
      print("etl for {} has error".format(config['database']))
      print('error message: {}'.format(error))
      continue
	
  # sql-server
  for config in sqlserver_db_config: 
    try:
      print("loading db: " + config['database'])
      etl_process(sqlserver_queries, target_cnx, config, 'sqlserver')
    except Exception as error:
      print("etl for {} has error".format(config['database']))
      print('error message: {}'.format(error))
      continue

  # firebird
  for config in fbd_db_config: 
    try:
      print("loading db: " + config['database'])
      etl_process(fbd_queries, target_cnx, config, 'firebird')
    except Exception as error:
      print("etl for {} has error".format(config['database']))
      print('error message: {}'.format(error))
      continue
	
  target_cnx.close()

if __name__ == "__main__":
  main()

Python ETL Example

Step 1: Import the modules and functions

In this ETL using Python example, first, you need to import the required modules and functions.

import glob 
import pandas as pd 
import xml.etree.ElementTree as ET 
from datetime import datetime
  • The dealership_data file contains CSV, JSON, and XML files for used car data. The features incorporated here are car_model, year_of_manufacture, price, and fuel. So you need to extract the file from the raw data, transform it into a target file, and then load it into the output.

Download the source file:

!wget https://cf-courses-data.s3.us.cloud-object-storage.appdomain.cloud/IBMDeveloperSkillsNetwork-PY0221EN-SkillsNetwork/labs/module%206/Lab%20-%20Extract%20Transform%20Load/data/datasource.zip

Extracting the zip file:

nzip datasource.zip -d dealership_data

Setting the path for Target files:

tmpfile    = "dealership_temp.tmp"               # store all extracted data

logfile    = "dealership_logfile.txt"            # all event logs will be stored

targetfile = "dealership_transformed_data.csv"   # transformed data is stored

Step 2: Extract

The Extract function in this ETL using Python example is used to extract a huge amount of data in batches. This data is extracted from numerous sources.

CSV Extract Function:

def extract_from_csv(file_to_process): 
    dataframe = pd.read_csv(file_to_process) 
    return dataframe

JSON Extract Function

def extract_from_json(file_to_process):
    dataframe = pd.read_json(file_to_process,lines=True)
    return dataframe

XML Extract Function

def extract_from_xml(file_to_process):

    dataframe = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel'])

    tree = ET.parse(file_to_process) 

    root = tree.getroot() 

    for person in root: 

        car_model = person.find("car_model").text 

        year_of_manufacture = int(person.find("year_of_manufacture").text)

        price = float(person.find("price").text) 

        fuel = person.find("fuel").text 

        dataframe = dataframe.append({"car_model":car_model, "year_of_manufacture":year_of_manufacture, "price":price, "fuel":fuel}, ignore_index=True) 

        return dataframe

Calling Extract Function()

def extract():
       extracted_data = pd.DataFrame(columns=['car_model','year_of_manufacture','price', 'fuel']) 
    #for csv files
      for csvfile in glob.glob("dealership_data/*.csv"):
          extracted_data = extracted_data.append(extract_from_csv(csvfile), ignore_index=True)
    #for json files
      for jsonfile in glob.glob("dealership_data/*.json"):
          extracted_data = extracted_data.append(extract_from_json(jsonfile), ignore_index=True)
    #for xml files
      for xmlfile in glob.glob("dealership_data/*.xml"):
          extracted_data = extracted_data.append(extract_from_xml(xmlfile), ignore_index=True)
      return extracted_data

Step 3: Transform

Using the transform function you can convert the data in any format as per your needs.

def transform(data):
       data['price'] = round(data.price, 2)
       return data

Step 4: Loading and Logging

In this step, the data is loaded to the destination file. A logging entry needs to be established before loading. Usually, the data can be loaded into:

  • Data warehouses
  • Databases
  • Flat files
  • Data lakes

A logging entry needs to be established before loading.

load function()

def load(targetfile,data_to_load):
    data_to_load.to_csv(targetfile)

log function()

def log(message):
    timestamp_format = '%H:%M:%S-%h-%d-%Y'
    #Hour-Minute-Second-MonthName-Day-Year
    now = datetime.now() # get current timestamp
    timestamp = now.strftime(timestamp_format)
    with open("dealership_logfile.txt","a") as f: f.write(timestamp + ',' + message + 'n')

Step 5: Running ETL Process

The log indicates that you have started the ETL process.

log("ETL Job Started")

The log indicates that you have started and ended the Extract phase.

log("Extract phase Started")
extracted_data = extract() 
log("Extract phase Ended")

The log indicates that you have started and ended the Transform phase.

log(“Transform phase Started”)
transformed_data = transform(extracted_data)
log("Transform phase Ended")

The log indicates that you have started and ended the Load phase.

log("Load phase Started")
load(targetfile,transformed_data)
log("Load phase Ended")

The log indicates that the ETL process has ended.

log("ETL Job Ended")

Using Python for ETL Processes 

  • With Python’s programming capabilities, you can create ETL pipelines that manage data and transform it in accordance with business requirements.
  • Because of Python’s versatility, data engineers and developers can write a program for almost any ETL process, including data aggregation. Python can efficiently handle the crucial parts of ETL operations, like indexing dictionaries and data structures. 
  • You can filter the null values from your data using the pre-built math module in Python. In addition, multiple Python ETL tools are built with externally defined functions and libraries and pure Python codes.
  • These tools support other Python libraries for extracting, loading, and transforming tables of data from multiple sources into data warehouses.

Alternative Programming Languages for ETL

The languages are as follows:

1) Java

  • Java is a popular programming language, particularly for developing client-server web applications. Java has influenced other programming languages, including Python, and has spawned a number of branches, including Scala.
  • Java serves as the foundation for several other big data tools, including Hadoop and Spark. The Java ecosystem also includes a library collection comparable to that of Python.
  • This programming language is designed in such a way that developers can write code anywhere and run it anywhere, regardless of the underlying computer architecture. It is also known as write once, run anywhere (WORA).

2) Ruby

  • Ruby, like Python, is a scripting language that allows developers to create ETL pipelines, but there are few ETL-specific Ruby frameworks available to make the task easier.
  • However, several libraries are currently in development, including Nokogiri, Kiba, and Square’s ETL package.

3) Go

  • Go, also known as Golang, is a programming language that is similar to C and is intended for data analysis and big data applications.
  • It was created to fill C++ and Java gaps discovered while working with Google’s servers and distributed systems.
  • Go includes several machine learning libraries, including support for Google’s TensorFlow, data pipeline libraries such as Apache Beam, and two ETL toolkits, Crunch and Pachyderm.

Conclusion

  • This article also provided information on Python, its key features, Python, different methods to set up ETL using Python Script, limitations of manually setting up ETL using Python, and the top 10 ETL using Python tools.
Manik Chhabra
Research Analyst, Hevo Data

Manik is a passionate data enthusiast with extensive experience in data engineering and infrastructure. He excels in writing highly technical content, drawing from his background in data science and big data. Manik's problem-solving skills and analytical thinking drive him to create impactful content for data professionals, helping them navigate their day-to-day challenges. He holds a Bachelor's degree in Computers and Communication, with a minor in Big Data, from Manipal Institute of Technology.