Airflow is an automation workflow management platform that helps orchestrate data pipelines. SQL Server is a widely used database for managing business data. Integrating Airflow and SQL Server enables automating ETL pipelines for extracting, loading and transforming SQL Server data. Users can take advantage of both platforms by performing an Airflow connect to SQL Server.

This article outlines the key benefits of Airflow SQL Server integration for scheduling and managing data workflow tasks. It will guide you through the 6 easy steps to set up the airflow MSSQL connection.

By leveraging Airflow for SQL Server data pipeline orchestration, you can save substantial time in managing workflows for moving and processing data from your SQL Server databases.

Setting Up Airflow SQL Server Integration

In this section, you will learn about the steps to set up Airflow SQL Server Integration. The Airflow SQL Server Integration is supported by Python language. The following steps for Airflow SQL Server Integration are listed below.

Follow this step-by-step guide to import data from Airflow to SQL server:

Simplify ETL with Hevo’s no-code Data Pipeline

Hevo is the only real-time ELT No-code Data Pipeline platform that cost-effectively automates data pipelines that are flexible to your needs. With integration with 150+ Data Sources (40+ free sources), we help you not only export data from sources & load data to the destinations but also transform & enrich your data, & make it analysis-ready.

Get Started with Hevo for Free

Step 1: Creating a Connection

  • First, install the library “apache-airflow” using the following command in the terminal, given below.
pip install "apache-airflow[celery]==2.2.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.6.txt"
  • Then, import the libraries in your Python code editor. Import the following libraries given below.
import json
from airflow.models.connection import Connection
  • Use the Connection class to make a new now Airflow SQL Server connection using the code given below.
c = Connection(
     conn_id="some_conn",
     conn_type="mysql",
     description="connection description",
     host="myhost.com",
     login="myname",
     password="mypassword",
     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
 )
  • You can get the connection URI using the get_uri() method, the code is given below.
  • Here, for host, enter the hostname to connect to; for login, enter the username to connect to. And for the password, specify the password to connect.
  • For extra, enter the extra parameters (as JSON dictionary) that can be used in an MSSQL connection.
print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
  • The output will look similar to this.
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
  • Now, let’s have a look at Airflow MSSQL Operator examples better to understand the usage of Airflow SQL Server Integration.

Step 2: Creating MSSQL Table Using MsSqlOperator

  • The Airflow mssql operator is used to make out SQL requests using Python language. It takes two required parameters: sql and mssql_conn_id
  • These parameters are fed to the MSSQL hook object that interacts directly with the MSSQL Database.
  • First, let’s import all the libraries and modules required, using the code given below.
from datetime import datetime
from airflow import DAG
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
  • Now, let’s create a DAG (Directed Acyclic Graph) that is a Python script that contains a set of tasks and their dependencies. The DAG is given below.
dag = DAG(
    'example_mssql',
    schedule_interval='@daily',
    start_date=datetime(2022, 03, 2),
    tags=['example'],
    catchup=False,
)
  • To create a table in MSSQL the following code is given below.
create_table_mssql_task = MsSqlOperator(
    task_id='create_country_table',
    mssql_conn_id='airflow_mssql',
    sql=r"""
    CREATE TABLE Country (
        country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
        name TEXT,
        continent TEXT
    );
    """,
    dag=dag,
)

Step 3: Inserting Data Into MSSQL Table

  • You can create a SQL query to insert data into the Users table and wrap the queries in the Python code.
  • The created MsSqlOperator task will look similar to the code given below.
populate_user_table = MsSqlOperator(
    task_id='populate_user_table',
    mssql_conn_id='airflow_mssql',
    sql=r"""
            INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
            INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
            INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
            INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
            """,
)

Step 4: Inserting MSSQL Hook

  • You can use the following code given below to insert the MSSQL hook.
@dag.task(task_id="insert_mssql_task")
	def insert_mssql_hook():
	 mssql_hook = MsSqlHook(mssql_conn_id='airflow_mssql', schema='airflow')
	 
	 rows = [
	 ('India', 'Asia'),
	 ('Germany', 'Europe'),
	 ('Argentina', 'South America'),
	 ('Ghana', 'Africa'),
	 ('Japan', 'Asia'),
	 ('Namibia', 'Africa'),
	 ]
	 target_fields = ['name', 'continent']
	 mssql_hook.insert_rows(table='Country', rows=rows, target_fields=target_fields)
  • Here in the above code, @dag_task is used to insert the task ID for this job.
  • Here, under schema, enter the schema name that is used in the database.

Step 5: Fetching Records from MSSQL Table

  • If you want to fetch the data from your MSSQL table, use the code below.
get_all_countries = MsSqlOperator(
    task_id="get_all_countries",
    mssql_conn_id='airflow_mssql',
    sql=r"""SELECT * FROM Country;""",
)
  • Here, all the data from the Country is fetched using the SQL query “SELECT * FROM Country;” wrapped into MsSqlOperator.

Step 6: Passing Parameters Into MsSqlOperator

  • MsSqlOperator provides a parameters attribute that allows dynamically injecting values into your SQL requests during runtime.
  • The code below will find the countries on the Asian continent.
get_countries_from_continent = MsSqlOperator(
    task_id="get_countries_from_continent",
    mssql_conn_id='airflow_mssql',
    sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
    params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
)

That’s it! You have completed the Airflow SQL Server Integration, where you read about connecting Airflow to MSSQL Server using Python and performed a few common operations to read and write the data in the MSSQL table.

Benefits of Airflow SQL Server Integration

A few benefits of using Airflow SQL Server Integration are listed below:

  • Airflow SQL Server Integration allows users to automatically load query results from one Microsoft SQL Server to another Server.
  • Airflow SQL Server Integration makes it easier for companies to automate and orchestrate the Data Pipeline workflows.
  • Airflow SQL Server Integration can be used to schedule the automated generation of reports, training Machine Learning model, running jobs, etc, where it takes the required data from Microsoft SQL Server.

Conclusion 

In this article, You learnt about the steps to set up Airflow SQL Server Integration. You also read about the key benefits of using the Airflow SQL Server Integration and how it helps companies orchestrate the Data Pipeline workflows and automate the process. Users can use Airflow SQL Server Integration by creating the DAGs is an easy process as it allows users to define the exact path of the workflow using relationships.

Visit our Website to Explore Hevo

Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. This is where Hevo steps in.

Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite firsthand. Check out our unbeatable pricing to help you choose the best plan.

Share your experience learning about Airflow SQL Server Integration in the comments section below!

References:

Aditya Jadon
Research Analyst, Hevo Data

Aditya has a keen interest in data science and is passionate about data, software architecture, and writing technical content. He has experience writing around 100 articles on data science.

No-code Data Pipeline For your Microsoft SQL