Airflow is an automation workflow management platform that helps orchestrate data pipelines. SQL Server is a widely used database for managing business data. Integrating Airflow and SQL Server enables automating ETL pipelines for extracting, loading and transforming SQL Server data. Users can take advantage of both platforms by performing an Airflow connect to SQL Server.

This article outlines the key benefits of Airflow SQL Server integration for scheduling and managing data workflow tasks. It will guide you through the 6 easy steps to set up the airflow MSSQL connection.

By leveraging Airflow for SQL Server data pipeline orchestration, you can save substantial time in managing workflows for moving and processing data from your SQL Server databases.

Setting Up Airflow SQL Server Integration

In this section, you will learn about the steps to set up Airflow SQL Server Integration. The Airflow SQL Server Integration is supported by Python language. The following steps for Airflow SQL Server Integration are listed below.

Follow this step-by-step guide to import data from Airflow to SQL server:

Step 1: Creating a Connection

  • First, install the library “apache-airflow” using the following command in the terminal, given below.
pip install "apache-airflow[celery]==2.2.3" --constraint "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.6.txt"
  • pip install is the command used to install Python packages.
  • "apache-airflow[celery]==2.2.3" specifies that version 2.2.3 of Apache Airflow with the Celery extra dependencies should be installed.
  • --constraint ensures that specific versions of dependencies are used as defined in the given URL.
  • The URL "https://raw.githubusercontent.com/apache/airflow/constraints-2.2.3/constraints-3.6.txt" points to a constraints file that locks dependency versions for Airflow to maintain compatibility.
  • This command helps to install Airflow with all necessary dependencies, while ensuring version consistency.
  • Then, import the libraries in your Python code editor. Import the following libraries given below.
import json
from airflow.models.connection import Connection
  • Use the Connection class to make a new now Airflow SQL Server connection using the code given below.
c = Connection(
     conn_id="some_conn",
     conn_type="mysql",
     description="connection description",
     host="myhost.com",
     login="myname",
     password="mypassword",
     extra=json.dumps(dict(this_param="some val", that_param="other val*")),
 )
  • Connection is a class that creates a connection object in Apache Airflow.
  • conn_id is a unique identifier for the connection, here it’s set to "some_conn".
  • conn_type defines the type of connection, in this case, "mysql".
  • extra=json.dumps(dict(this_param="some val", that_param="other val*")) adds extra parameters in JSON format for additional configuration.
  • The connection is established with the provided credentials such as host, login, and password.
  • You can get the connection URI using the get_uri() method, the code is given below.
  • Here, for host, enter the hostname to connect to; for login, enter the username to connect to. And for the password, specify the password to connect.
  • For extra, enter the extra parameters (as JSON dictionary) that can be used in an MSSQL connection.
print(f"AIRFLOW_CONN_{c.conn_id.upper()}='{c.get_uri()}'")
  • The print() function displays a formatted string with connection details.
  • f"AIRFLOW_CONN_{c.conn_id.upper()}" creates an environment variable name using the connection ID, converting it to uppercase.
  • c.get_uri() retrieves the connection URI (Uniform Resource Identifier) for the specified connection object c.
  • The resulting output will be an Airflow connection URI in the format AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com'.
  • This is useful for setting the connection string in environment variables for Airflow tasks.
  • The output will look similar to this.
AIRFLOW_CONN_SOME_CONN='mysql://myname:mypassword@myhost.com?this_param=some+val&that_param=other+val%2A'
  • Now, let’s have a look at Airflow MSSQL Operator examples better to understand the usage of Airflow SQL Server Integration.

Step 2: Creating MSSQL Table Using MsSqlOperator

  • The Airflow mssql operator is used to make out SQL requests using Python language. It takes two required parameters: sql and mssql_conn_id
  • These parameters are fed to the MSSQL hook object that interacts directly with the MSSQL Database.
  • First, let’s import all the libraries and modules required, using the code given below.
from datetime import datetime
from airflow import DAG
from airflow.providers.microsoft.mssql.hooks.mssql import MsSqlHook
from airflow.providers.microsoft.mssql.operators.mssql import MsSqlOperator
  • Now, let’s create a DAG (Directed Acyclic Graph) that is a Python script that contains a set of tasks and their dependencies. The DAG is given below.
dag = DAG(
    'example_mssql',
    schedule_interval='@daily',
    start_date=datetime(2022, 03, 2),
    tags=['example'],
    catchup=False,
)
  • dag = DAG() creates a new Directed Acyclic Graph (DAG) object in Airflow.
  • 'example_mssql' is the name of the DAG, which is a unique identifier.
  • schedule_interval='@daily' sets the DAG to run once a day.
  • start_date=datetime(2022, 03, 2) specifies the date when the DAG should start running.
  • catchup=False ensures that the DAG will not backfill missed runs between the start date and the current date.
  • To create a table in MSSQL the following code is given below.
create_table_mssql_task = MsSqlOperator(
    task_id='create_country_table',
    mssql_conn_id='airflow_mssql',
    sql=r"""
    CREATE TABLE Country (
        country_id INT NOT NULL IDENTITY(1,1) PRIMARY KEY,
        name TEXT,
        continent TEXT
    );
    """,
    dag=dag,
)
  • create_table_mssql_task = MsSqlOperator() creates a new task to execute SQL code on an MS SQL Server database.
  • task_id='create_country_table' assigns a unique identifier to the task.
  • mssql_conn_id='airflow_mssql' specifies the connection ID for the MS SQL Server connection in Airflow.
  • The sql parameter contains the SQL statement to create a Country table with columns country_id, name, and continent.
  • dag=dag assigns the task to the previously defined DAG.

Automate Your SQL Server Data Pipelines:

Integrate Kafka to MS SQL Server
Integrate REST API to MS SQL Server
Integrate PostgreSQL to MS SQL Server
Integrate MySQL to MS SQL Server

Step 3: Inserting Data Into MSSQL Table

  • You can create a SQL query to insert data into the Users table and wrap the queries in the Python code.
  • The created MsSqlOperator task will look similar to the code given below.
populate_user_table = MsSqlOperator(
    task_id='populate_user_table',
    mssql_conn_id='airflow_mssql',
    sql=r"""
            INSERT INTO Users (username, description)
            VALUES ( 'Danny', 'Musician');
            INSERT INTO Users (username, description)
            VALUES ( 'Simone', 'Chef');
            INSERT INTO Users (username, description)
            VALUES ( 'Lily', 'Florist');
            INSERT INTO Users (username, description)
            VALUES ( 'Tim', 'Pet shop owner');
            """,
)
  • populate_user_table = MsSqlOperator() creates a task to execute SQL queries on a MS SQL Server.
  • task_id='populate_user_table' assigns a unique identifier to this task.
  • mssql_conn_id='airflow_mssql' specifies the connection ID to the MS SQL Server in Airflow.
  • The sql parameter contains SQL INSERT statements to add rows into the Users table for users like ‘Danny’, ‘Simone’, ‘Lily’, and ‘Tim’.
  • The task will execute these SQL queries to populate the Users table with data when run.

Step 4: Inserting MSSQL Hook

  • You can use the following code given below to insert the MSSQL hook.
@dag.task(task_id="insert_mssql_task")
	def insert_mssql_hook():
	 mssql_hook = MsSqlHook(mssql_conn_id='airflow_mssql', schema='airflow')
	 
	 rows = [
	 ('India', 'Asia'),
	 ('Germany', 'Europe'),
	 ('Argentina', 'South America'),
	 ('Ghana', 'Africa'),
	 ('Japan', 'Asia'),
	 ('Namibia', 'Africa'),
	 ]
	 target_fields = ['name', 'continent']
	 mssql_hook.insert_rows(table='Country', rows=rows, target_fields=target_fields)
  • @dag.task(task_id="insert_mssql_task") decorates the function insert_mssql_hook to define it as an Airflow task.
  • mssql_hook = MsSqlHook(mssql_conn_id='airflow_mssql', schema='airflow') creates a connection to the MS SQL Server using the MsSqlHook.
  • rows = [...] defines a list of tuples with country names and their corresponding continents to be inserted into the Country table.
  • target_fields = ['name', 'continent'] specifies the column names in the Country table to match the data from rows.
  • mssql_hook.insert_rows(table='Country', rows=rows, target_fields=target_fields) inserts the rows into the Country table in the specified MS SQL Server database.
  • Here in the above code, @dag_task is used to insert the task ID for this job.
  • Here, under schema, enter the schema name that is used in the database.

Step 5: Fetching Records from MSSQL Table

  • If you want to fetch the data from your MSSQL table, use the code below.
get_all_countries = MsSqlOperator(
    task_id="get_all_countries",
    mssql_conn_id='airflow_mssql',
    sql=r"""SELECT * FROM Country;""",
)
  • get_all_countries = MsSqlOperator(...) creates a task to run a SQL query in Airflow using the MsSqlOperator.
  • task_id="get_all_countries" assigns a unique identifier to the task.
  • mssql_conn_id='airflow_mssql' specifies the connection ID to the MS SQL Server database.
  • sql=r"""SELECT * FROM Country;""" defines the SQL query to fetch all records from the Country table.
  • The task will execute this SQL query when the DAG is run to retrieve the data from the Country table in the MS SQL Server.
  • Here, all the data from the Country is fetched using the SQL query “SELECT * FROM Country;” wrapped into MsSqlOperator.

Step 6: Passing Parameters Into MsSqlOperator

  • MsSqlOperator provides a parameters attribute that allows dynamically injecting values into your SQL requests during runtime.
  • The code below will find the countries on the Asian continent.
get_countries_from_continent = MsSqlOperator(
    task_id="get_countries_from_continent",
    mssql_conn_id='airflow_mssql',
    sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""",
    params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"},
)
  • get_countries_from_continent = MsSqlOperator(...) creates an Airflow task that executes an SQL query in MS SQL Server.
  • task_id="get_countries_from_continent" gives the task a unique identifier.
  • mssql_conn_id='airflow_mssql' specifies the connection to the MS SQL Server database.
  • sql=r"""SELECT * FROM Country where {{ params.column }}='{{ params.value }}';""" is a SQL query with placeholders for dynamic values.
  • params={"column": "CONVERT(VARCHAR, continent)", "value": "Asia"} defines the parameters, replacing the placeholders to filter countries where the continent is “Asia”.

That’s it! You have completed the Airflow SQL Server Integration, where you read about connecting Airflow to MSSQL Server using Python and performed a few common operations to read and write the data in the MSSQL table.

Benefits of Airflow SQL Server Integration

A few benefits of using Airflow SQL Server Integration are listed below:

  • Airflow SQL Server Integration allows users to automatically load query results from one Microsoft SQL Server to another Server.
  • Airflow SQL Server Integration makes it easier for companies to automate and orchestrate the Data Pipeline workflows.
  • Airflow SQL Server Integration can be used to schedule the automated generation of reports, training Machine Learning model, running jobs, etc, where it takes the required data from Microsoft SQL Server.

Conclusion 

In this article, You learnt about the steps to set up Airflow SQL Server Integration. You also read about the key benefits of using the Airflow SQL Server Integration and how it helps companies orchestrate the Data Pipeline workflows and automate the process. Users can use Airflow SQL Server Integration by creating the DAGs is an easy process as it allows users to define the exact path of the workflow using relationships.

Visit our Website to Explore Hevo

Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. This is where Hevo steps in.

Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite firsthand. Check out our unbeatable pricing to help you choose the best plan.

Share your experience learning about Airflow SQL Server Integration in the comments section below!

References:

Aditya Jadon
Research Analyst, Hevo Data

Aditya Jadon is a data science enthusiast with a passion for decoding the complexities of data. He leverages his B. Tech degree, expertise in software architecture, and strong technical writing skills to craft informative and engaging content. Aditya has authored over 100 articles on data science, demonstrating his deep understanding of the field and his commitment to sharing knowledge with others.