Companies try their best to manage their business data and use it in a better way. As the rate at which the data is generated every day, there is a need for a faster and simpler way to manage all the data flow from one system to another. There are many tools available in the market that help companies automate their Data Pipeline workflows.
Apache Airflow is a workflow management platform that helps companies orchestrate their Data Pipeline tasks and save time. Airflow allows users to pull and push data into other systems. The data is stored in Databases or systems that can be managed via Airflow using automated workflows. Microsoft SQL Server is a widely used Database that comes with many features and good performance to manage business data. Airflow SQL Serer Integration allows companies to automate the Data Engineering Pipeline tasks by orchestrating the workflows using scripts.
Airflow SQL Server Integration helps users execute SQL commands for extracting and loading data, calling a stored procedure, etc from the Database. In this article, you will learn about Apache Airflow, Microsoft SQL Server, and the steps to set up Airflow SQL Server Integration. You will also read about the benefits of using Airflow SQL Server Integration and how it helps users schedule and manage Data Pipelines.
Prerequisites
- Microsoft SQL Server installed on your local machine.
- Python installed on your local machine.
- Basic knowledge of SQL.
Introduction to Apache Airflow
Image Source
Apache Airflow is an open-source workflow management platform widely used for scheduling and managing Data Engineering Pipelines. Its workflow engine allows users to run complex Data Pipelines easily and ensures that each task of the pipeline will get executed in the correct order and every task gets the required resources. Apache Airflow is written in Python, and Python scripts create all the workflows and tasks.
Apache Airflow uses Directed Acyclic Graphs (DAGs) to manage workflow orchestration with the interactive user interface to monitor and fix any issues that may arise. It is distributed, scalable, and flexible, making it well-suited to handle the orchestration of complex business logic.
Key Features of Apache Airflow
Some of the main features of Apache Airflow are listed below.
- Robust Integrations: Airflow offers many plug-and-play integrations that allow users to execute the tasks on the Google Cloud Platform.
- Easy to Use: Users can easily deploy a workflow using Python. It can be used to build Machine Learning models, transfer data, manage your infrastructure, and more.
- Interactive UI: Airflow comes with interactive web applications so that users can effortlessly monitor, schedule, and manage your workflows
To know more about Apache Airflow, click here.
Introduction to Microsoft SQL Server
Image Source
Microsoft SQL Server is a Relational Database Management system that offers a wide variety of transaction processing, Business Intelligence, and Analytics applications. It is developed by Microsoft to let users retrieve ad store data for personal or corporate use. Microsoft SQL Server uses SQL (Structured Query Language) to access, manage, query, and manipulate data in the Database. It can be used for handling transaction processing and run on a central server that allows users to get concurrent access.
Microsoft offers around a dozen of different editions of Microsoft SQL Server that serve businesses ranging from small to large scale. The core component of Microsoft SQL Server is the SQL Server Database Engine, which controls data storage, processing, and security.
Key Features of Microsoft SQL Server
Some of the main features of Microsoft SQL Server are listed below:
- Database Engine: Database Engine is the core part of Microsoft SQL Server that is used to store, process, and secure data.
- Analysis Service: Analysis Service (SSAS) is an Online Analytical Processing and Data Mining Server that is used to create various types of paginated reports.
- Integration Services: SSIS is widely used for importing and exporting data from a Database. It is responsible for the ETL process.
- Accelerated Database Recovery: Microsoft SQL Server comes with ADR technology that is a new way of performing Database recovery in an event of transaction rollback or system failure.
- Polybase: Polybase is a part of the SQL module that allows users to run fast and parallel T-SQL queries that can go out into external storage.
To know more about Microsoft SQL Server, click here.
Setting Up Airflow SQL Server Integration
Now that you have understood about Apache Airflow and MSSQL Server. In this section, you will learn about the steps to set up Airflow SQL Server Integration. The Airflow SQL Server Integration is supported by Python language. The following steps for Airflow SQL Server Integration are listed below.
Step 1: Creating a Connection
- First, install the library “apache-airflow” using the following command in the terminal, given below.
pip install "apache-airflow[celery]==2.2.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.6.txt"
- Then import the libraries in your Python code editor. Import the following libraries given below.
import json
from airflow.models.connection import Connection
- Use the Connection class to make a new now Airflow SQL Server connection using the code given below.
c = Connection(
conn_id="some_conn",
conn_type="mysql",
description="connection description",
host="myhost.com",
login="myname",
password="mypassword",
extra=json.dumps(dict(this_param="some val", that_param="other val*")),
)
- You can get the connection URI using the get_uri() method, the code is given below.
print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
- The output will look similar to this.
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
- Now let’s have a look at Airflow MSSQL Operator examples to better understand the usage of Airflow SQL Server Integration.
Step 2: Creating MSSQL Table Using MsSqlOperator
- The Airflow mssql operator is used to make out SQL requests using Python language. It takes two required parameters: sql and mssql_conn_id.
- These parameters are fed to the MSSQL hook object that interacts directly with the MSSQL Database.
- First, let’s import all the libraries and modules required, using the code given below.
from datetime import datetime
from airflow import DAG
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
- Now, let’s create a DAG (Directed Acyclic Graph) that is a Python script that contains a set of tasks and their dependencies. The DAG is given below.
dag = DAG(
'example_mssql',
schedule_interval='@daily',
start_date=datetime(2022, 03, 2),
tags=['example'],
catchup=False,
)
- To create a table in MSSQL the following code is given below.
create_table_mssql_task = MsSqlOperator(
task_id='create_country_table',
mssql_conn_id='airflow_mssql',
sql=r"""
CREATE TABLE Country (
country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
name TEXT,
continent TEXT
);
""",
dag=dag,
)
Step 3: Inserting Data Into MSSQL Table
- You can create a SQL query to insert data into the Users table and wrap the queries in the Python code.
- The created MsSqlOperator task will look similar to the code given below.
populate_user_table = MsSqlOperator(
task_id='populate_user_table',
mssql_conn_id='airflow_mssql',
sql=r"""
INSERT INTO Users (username, description)
VALUES ( 'Danny', 'Musician');
INSERT INTO Users (username, description)
VALUES ( 'Simone', 'Chef');
INSERT INTO Users (username, description)
VALUES ( 'Lily', 'Florist');
INSERT INTO Users (username, description)
VALUES ( 'Tim', 'Pet shop owner');
""",
)
Step 4: Inserting MSSQL Hook
- You can use the following code given below to insert the MSSQL hook.
@dag.task(task_id="insert_mssql_task")
def insert_mssql_hook():
mssql_hook = MsSqlHook(mssql_conn_id='airflow_mssql', schema='airflow')
rows = [
('India', 'Asia'),
('Germany', 'Europe'),
('Argentina', 'South America'),
('Ghana', 'Africa'),
('Japan', 'Asia'),
('Namibia', 'Africa'),
]
target_fields = ['name', 'continent']
mssql_hook.insert_rows(table='Country', rows=rows, target_fields=target_fields)
- Here in the above code, @dag_task is used to insert the task ID for this job.
Step 5: Fetching Records from MSSQL Table
- If you want to fetch the data from your MSSQL table, you can use the code given below.
get_all_countries = MsSqlOperator(
task_id="get_all_countries",
mssql_conn_id='airflow_mssql',
sql=r"""SELECT * FROM Country;""",
)
- Here, all the data from the Country is fetched using the SQL query “SELECT * FROM Country;” wrapped into MsSqlOperator.
Step 6: Passing Parameters Into MsSqlOperator
- MsSqlOperator provides parameters attribute which makes it possible to dynamically inject values into your SQL requests during runtime.
- The following code given below will find the countries in the Asian continent.
get_countries_from_continent = MsSqlOperator(
task_id="get_countries_from_continent",
mssql_conn_id='airflow_mssql',
sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
)
That’s it! You have completed the Airflow SQL Server Integration where you read about connecting Airflow to MSSQL Server using Python and performed a few common operations to read and write the data in the MSSQL table.
Benefits of Airflow SQL Server Integration
A few benefits of using Airflow SQL Server Integration are listed below:
- Airflow SQL Server Integration allows users to automatically load query results from one Microsoft SQL Server to another Server.
- Airflow SQL Server Integration makes it easier for companies to automate and orchestrate the Data Pipeline workflows.
- Airflow SQL Server Integration can be used to schedule the automated generation of reports, training Machine Learning model, running jobs, etc, where it takes the required data from Microsoft SQL Server.
Conclusion
In this article, You learnt about Airflow, Microsoft SQL Server, and the steps to set up Airflow SQL Server Integration. You also read about the key benefits of using the Airflow SQL Server Integration and how it helps companies orchestrate the Data Pipeline workflows and automate the process. Users can use Airflow SQL Server Integration by creating the DAGs is an easy process as it allows users to define the exact path of the workflow using relationships.
Visit our Website to Explore Hevo
Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 150+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.
Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite firsthand.
Share your experience learning about Airflow SQL Server Integration in the comments section below!