Guide to Implement a Python DAG in Airflow Simplified 101

• February 14th, 2022

Python DAG in Airflow_FI

Airflow is a Task Automation tool. It helps organizations to schedule their tasks so that they are executed when the right time comes. This relieves the employees from doing tasks repetitively. When using Airflow, you will want to access it and perform some tasks from other tools. Furthermore, Apache Airflow is used to schedule and orchestrate data pipelines or workflows.

In this article, you will gain information about Python DAG in Airflow. You will also gain a holistic understanding of Python, Apache Airflow, their key features, DAGs, Operators, Dependencies, and the steps for implementing a Python DAG in Airflow. Read along to find out in-depth information about Python DAG in Airflow.

Table of Contents

What is Python?

Python DAG Airflow: Python Logo
Image Source

Python is a versatile general-purpose Programming Language. Its small learning curve coupled with its robustness has made it one of the most popular Programming Languages ​​today. It is the go-to choice of developers for Website and Software Development, Automation, Data Analysis, Data Visualization, and much more. Moreover, its straightforward syntax allows Accountants, Scientists to utilize it for daily tasks. The Python Programming Language serves as the key integral tool in the field of Data Science for performing complex Statistical Calculations, creating Machine Learning Algorithms, etc. 

Python Programming Language is also renowned for its ability to generate a variety of Data Visualizations like Bar Charts, Column Charts, Pie Charts,  and 3D Charts. Furthermore, it offers a rich set of libraries that facilitates advanced Machine Learning programs in a faster and simpler manner. 

Key Features of Python

Python DAG Airflow: Python Features
Image Source

The following features are responsible for Python Programming Language’s popularity today:

  • Beginner Friendly: The Python Programming Language offers a hassle-free environment for developers. Its straightforward workflow is suitable for everyone and entry-level coders are drawn to it. Moreover, you can use and distribute its open-source codes for commercial purposes free of cost.
  • Robust Applications: Its simple syntax operates on natural human-readable language making it the go-to choice of projects on Python Programming Language, which is faster as compared to other Programming Languages. Furthermore, its versatile nature makes it the ideal choice for Web Development and Machine Learning projects.
  • Large Communities: Due to Python’s immense popularity, a huge active community of programmers is available online that contributes to this language’s modules and libraries. Moreover, this vast support community is ready to help in case you or any other coder gets stuck in a programming issue. You can easily get suggestions and solutions by posting your issue on these community pages.

You can understand more about the Python Programming Language by visiting here.

What is Airflow?

Python DAG Airflow: Airflow logo
Image Source

Airflow is a platform that enables its users to automate scripts for performing tasks. It comes with a scheduler that executes tasks on an array of workers while following a set of defined dependencies. Airflow also comes with rich command-line utilities that make it easy for its users to work with directed acyclic graphs (DAGs). The DAGs simplify the process of ordering and managing tasks for companies. 

Airflow also has a rich user interface that makes it easy to monitor progress, visualize pipelines running in production, and troubleshoot issues when necessary. 

Key Features of Airflow

  • Dynamic Integration: Airflow uses Python as the backend programming language to generate dynamic pipelines. Several operators, hooks, and connectors are available that create DAG and tie them to create workflows.
  • Extensible: Airflow is an open-source platform, and so it allows users to define their custom operators, executors, and hooks. You can also extend the libraries so that it fits the level of abstraction that suits your environment.
  • Elegant User Interface: Airflow uses Jinja templates to create pipelines, and hence the pipelines are lean and explicit. Parameterizing your scripts is a straightforward process in Airflow.
  • Scalable: Airflow is designed to scale up to infinity. You can define as many dependent workflows as you want. Airflow creates a message queue to orchestrate an arbitrary number of workers. 

Airflow can easily integrate with all the modern systems for orchestration. Some of these modern systems are as follows:

  • Google Cloud Platform
  • Amazon Web Services
  • Microsoft Azure
  • Apache Druid
  • Snowflake
  • Hadoop ecosystem
  • Apache Spark
  • PostgreSQL, SQL Server
  • Google Drive
  •  JIRA
  • Slack
  • Databricks

You can find the complete list here

Simplify your Data Analysis with Hevo’s No-code Data Pipeline

A fully managed No-code Data Pipeline platform like Hevo Data helps you integrate and load data from 100+ different sources (including 40+ free sources) to a Data Warehouse or Destination of your choice in real-time in an effortless manner. Hevo with its minimal learning curve can be set up in just a few minutes allowing the users to load data without having to compromise performance. Its strong integration with umpteenth sources allows users to bring in data of different kinds in a smooth fashion without having to code a single line. 

Its completely automated pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensure that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different Business Intelligence (BI) tools as well.

Get Started with Hevo for Free

Check out why Hevo is the Best:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
  • Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

What are DAGs?

DAG stands for Directed Acyclic Graph. The core concept of Airflow is a DAG, which collects Tasks and organizes them with dependencies and relationships to specify how they should run.

In simple terms, it is a graph with nodes, directed edges, and no cycles.

Python DAG Airflow: Valid DAG
Image Source

The above image is a valid DAG.

Python DAG Airflow: Invalid DAG
Image Source

But, the above image is an invalid DAG.

Because there is a cyclical nature to things. Because Node A is dependent on Node C, which is dependent on Node B, and Node B is dependent on Node A, this invalid DAG will not run at all. And also the first DAG has no cycles.

In Apache Airflow, a DAG is similar to a Data Pipeline. As a result, whenever you see the term “DAG,” it refers to a “Data Pipeline.” Finally, when a DAG is triggered, a DAGRun is created. A DAGRun is an instance of your DAG with an execution date in Airflow.

Here’s a basic example of DAG:

Python DAG Airflow: Basic example of DAG
Image Source

It defines four Tasks – A, B, C, and D. It also specifies the order in which they must be executed, as well as which tasks are dependent on which others. It will also specify how frequently the DAG should be run, such as “every 2 hours starting tomorrow” or “every day since May 15th, 2022.”

The DAG is not concerned about what is going on inside the tasks. Rather, it is truly concerned with how they are executed – the order in which they are run, how many times they are retried, whether they have timeouts, and so on.

A DAG in Airflow is simply a Python script that contains a set of tasks and their dependencies. The operator of each task determines what the task does. Using PythonOperator to define a task, for example, means that the task will consist of running Python code.

What is an Airflow Operator?

In an Airflow DAG, Nodes are Operators. In other words, a Task in your DAG is an Operator. An Operator is a class encapsulating the logic of what you want to achieve. For example, you want to execute a python function, you will use the PythonOperator.

When an operator is triggered, it becomes a task, and more specifically, a task instance. An example of operators:

from airflow.operators.dummy import DummyOperator
from airflow.operators.bash import BashOperator
# The DummyOperator is a task and does nothing   
accurate = DummyOperator(
task_id='accurate'
)
# The BashOperator is a task to execute a bash command
commands = BashOperator(
task_id='commands'
bash_command='sleep 5'
)

Both Operators in the preceding code snippet have some arguments. The task_id is the first one. The task_id is the operator’s unique identifier in the DAG. Each Operator must have a unique task_id. The other arguments to fill in are determined by the operator.

What are Dependencies in Airflow?

A DAG in Airflow has directed edges. Those directed edges are the Dependencies between all of your operators/tasks in an Airflow DAG. Essentially, if you want to say “Task A is executed before Task B,” then the corresponding dependency can be illustrated as shown in the example below.

task_a >> task_b
# Or
task_b << task_a

The >> and << represent “right bitshift” and “left bitshift,” or “set downstream task” and “set upstream task,” respectively. On the first line of the example, we say that task_b is a downstream task to task_a. The second line specifies that task_a is an upstream task of task_b

Implementing your Python DAG in Airflow

There are 4 steps to follow to create a data pipeline.

Let’s take the following picture of the DAG into reference and code the Python DAG.

Python DAG Airflow: DAG Example Chart
Data pipeline

Step 1: Make the Imports

The first step is to import the necessary classes. In order to create a Python DAG in Airflow, you must always import the required Python DAG class. Following the DAG class are the Operator imports. Basically, you must import the corresponding Operator for each one you want to use.

To execute a Python function, for example, you must import the PythonOperator. If you want to run a bash command, you must first import the BashOperator. Finally, because your DAG requires a start date, the datetime class is usually the last to be imported.

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime

Step 2: Create the Airflow Python DAG object

The second step is to create the Airflow Python DAG object after the imports have been completed.

A DAG object must have two parameters:

  • dag_id
  • start_date

The dag_id is the DAG’s unique identifier across all DAGs. Each DAG must have its own dag id. The start_date specifies when your DAG will begin to be scheduled. 

Note: If the start_date is set in the past, the scheduler will try to backfill all the non-triggered DAG Runs between the start_date and the current date. For example, if your start_date is defined with a date 3 years ago, you might end up with many DAG Runs running at the same time.

In addition to those two arguments, two more are typically specified. The catchup and schedule_interval arguments.

The schedule_interval argument specifies the time interval at which your DAG is triggered. Every 20 minutes, every hour, every day, every month, and so on.

There are two ways to define the schedule_interval:

  • Either with a CRON expression (most used option), or
  • With a timedelta object

Secondly, the catchup argument prevents your DAG from automatically backfilling non-triggered DAG Runs between the start date of your DAG and the current date. If you don’t want multiple DAG runs running at the same time, it’s usually a good idea to set it to False.

with DAG("my_dag", # Dag id
start_date=datetime(2021, 1 ,1), # start date, the 1st of January 2021 
schedule_interval='@daily',  # Cron expression, here it is a preset of Airflow, @daily means once every day.
catchup=False  # Catchup 
) as dag:

It’s worth noting that we use the “with” statement to create a DAG instance. Because “with” is a context manager, it allows you to manage objects more effectively.

Step 3: Add the Tasks

Once you have made the imports and created your Python DAG object, you can begin to add your tasks. (A task is an operator). Therefore, based on your DAG, you have to add 6 operators.

The different tasks are as follows:

A) Training model tasks

The PythonOperator is used to implement the training models A, B, and C. Because real machine learning models are too complicated to train here, each task will return a random accuracy. This accuracy will be calculated using the python function, _training_model.

from random import randint # Import to generate random numbers
def _training_model():
return randint(1, 10) # return an integer between 1 - 10
with DAG(...) as dag:
# Tasks are implemented under the dag object
training_model_A = PythonOperator(
task_id="training_model_A",
python_callable=_training_model
)
training_model_B = PythonOperator(
task_id="training_model_B",
python_callable=_training_model
)
training_model_C = PythonOperator(
task_id="training_model_C",
python_callable=_training_model
)

The three tasks in the preceding code are very similar. The only distinction is in the task ids. As a result, because DAGs are written in Python, you can take advantage of this and generate tasks dynamically, as shown in the following example.

training_model_tasks = [
PythonOperator(
task_id=f"training_model_{model_id}",
python_callable=_training_model,
op_kwargs={
"model": model_id
}
) for model_id in ['A', 'B', 'C']
]

By defining a list comprehension, you are able to generate the 3 tasks dynamically comparatively cleaner and easier.

For better understanding of the PythonOperator, you can visit here.

B) Choosing Best Model

Choosing Best ML” is the next task. Because this task executes the whether the task is “accurate” or “inaccurate” based on the best accuracy, the BranchPythonOperator appears to be the ideal candidate for that. It enables you to carry out one task or another based on a condition, a value, or a criterion.

The BranchPythonOperator is one of the most commonly used Operator. To implement it, you can refer the following code.

def _choosing_best_model(ti):
accuracies = ti.xcom_pull(task_ids=[
'training_model_A',
'training_model_B',
'training_model_C'
])
if max(accuracies) > 8:
return 'accurate'
return 'inaccurate'
with DAG(...) as dag:
choosing_best_model = BranchPythonOperator(
task_id="choosing_best_model",
python_callable=_choosing_best_model
)

The BranchPythonOperator first runs a Python function i.e, _choosing_best_model in this function. The task id of the next task to execute must be returned by this function. As indicated by the return keywords, your Python DAG should be either “accurate” or “inaccurate.”

What is xcom_pull?

When using Airflow, you need to use XCOMs to share data between tasks. XCOM is an acronym that stands for Cross-Communication Messages. It is a mechanism that allows small data to be exchanged between DAG tasks. An XCOM is an object that contains a key that serves as an identifier and a value that corresponds to the value you want to share.

By returning the accuracy from the python function _training_model_X, you create an XCOM with that accuracy and then use xcom_pull in _choosing_best_model to retrieve that XCOM back corresponding to the accuracy. You specify the task ids of these three tasks as you want the accuracy of each training_model task.

C) Assigning “Accurate” or “Inaccurate”

Accurate” and “inaccurate” are the final two tasks to complete. To do so, use the BashOperator and run a simple bash command to print “accurate” or “inaccurate” in the standard output.

    accurate = BashOperator(
task_id="accurate",
bash_command="echo 'accurate'"
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command=" echo 'inaccurate'"
)

Bash commands are executed using the BashOperator.

Step 4: Defining Dependencies

After you’ve completed all of the tasks, the final step is to put the glue between them, or to define their Dependencies. This can be accomplished by utilising Bitshift operators.

It’s really simple in this case because you want to execute one task after the other.

 with DAG(...) as dag:
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

training_model_tasks are executed first, then after all of the tasks are completed, choosing_best_model is executed, and finally, either “accurate” or “inaccurate“.

Use a list with [ ] whenever you have multiple tasks that should be on the same level, in the same group, and can be executed at the same time.

The Final Python DAG in Airflow

The full code of the Python DAG in Airflow is as follows:

from airflow import DAG
from airflow.operators.python import PythonOperator, BranchPythonOperator
from airflow.operators.bash import BashOperator
from datetime import datetime
from random import randint
def _choosing_best_model(ti):
accuracies = ti.xcom_pull(task_ids=[
'training_model_A',
'training_model_B',
'training_model_C'
])
if max(accuracies) > 8:
return 'accurate'
return 'inaccurate'
def _training_model(model):
return randint(1, 10)
with DAG("my_dag",
start_date=datetime(2021, 1 ,1), 
schedule_interval='@daily', 
catchup=False) as dag:
training_model_tasks = [
PythonOperator(
task_id=f"training_model_{model_id}",
python_callable=_training_model,
op_kwargs={
"model": model_id
}
) for model_id in ['A', 'B', 'C']
]
choosing_best_model = BranchPythonOperator(
task_id="choosing_best_model",
python_callable=_choosing_best_model
)
accurate = BashOperator(
task_id="accurate",
bash_command="echo 'accurate'"
)
inaccurate = BashOperator(
task_id="inaccurate",
bash_command=" echo 'inaccurate'"
)
training_model_tasks >> choosing_best_model >> [accurate, inaccurate]

If you want to test it, copy the code into a file called my_first_dag.py and save it in the Airflow folder dags/. After that, run it from the UI and you should get the following output:

Python DAG Airflow: Graph View
Image Source

For further information about the example of Python DAG in Airflow, you can visit here.

Conclusion

In this article, you have learned about Airflow Python DAG. This article also provided information on Python, Apache Airflow, their key features, DAGs, Operators, Dependencies, and the steps for implementing a Python DAG in Airflow in detail. For further information on Airflow ETL, Airflow Databricks Integration, Airflow REST API, you can visit the following links.

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations with a few clicks.

Visit our Website to Explore Hevo

Hevo Data with its strong integration with 100+ data sources (including 40+ Free Sources) allows you to not only export data from your desired data sources & load it to the destination of your choice but also transform & enrich your data to make it analysis-ready. Hevo also allows integrating data from non-native sources using Hevo’s in-built Webhooks Connector. You can then focus on your key business needs and perform insightful analysis using BI tools. 

Want to give Hevo a try?

Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You may also have a look at the amazing price, which will assist you in selecting the best plan for your requirements.

Share your experience of understanding Apache Airflow Redshift Operators in the comment section below! We would love to hear your thoughts.

No-code Data Pipeline for your Data Warehouse