How to Generate Airflow Dynamic DAGs: Ultimate How-to Guide101

on Airflow Webserver, Apache Airflow • February 8th, 2022 • Write for Hevo

Airflow Dynamic DAGs FI

Users can design workflows as DAGs (Directed Acyclic Graphs) of jobs with Airflow. Airflow’s powerful User Interface makes visualizing pipelines in production, tracking progress, and resolving issues a breeze.

Writing an Airflow DAG as a Static Python file is the simplest way to do it. However, manually writing DAGs isn’t always feasible as you have hundreds or thousands of DAGs that all do the same thing but differ just in one parameter. Maybe you need a collection of DAGs to load tables but don’t want to update them manually every time the tables change. In these and other situations, Airflow Dynamic DAGs may make more sense. Since everything in Airflow is code, you can construct DAGs dynamically using just Python.

In this article, you will learn everything about Airflow Dynamic DAGs along with the process which you might want to carry out while using it with simple Python Scripts to make the process run smoothly.

Table of Contents

What is Apache Airflow?

Airflow Dynamic DAGs: logo
Image Source

Apache Airflow is an Open-Source workflow authoring, scheduling, and monitoring application. It’s one of the most reliable systems for orchestrating processes or Pipelines that Data Engineers employ. You can quickly see the dependencies, progress, logs, code, trigger tasks, and success status of your Data Pipelines.

Airflow allows users to create workflows as DAGs (Directed Acyclic Graphs) of jobs. The sophisticated User Interface of Airflow makes it simple to visualize pipelines in production, track progress, and resolve issues as needed. It links to a variety of Data Sources and can send an email or Slack notice when a task is completed or failed. Since Airflow is distributed, scalable, and adaptable, it’s ideal for orchestrating complicated Business Logic.

Key Features of Apache Airflow

  • Robust Integrations: It will provide you with ready-to-use operators for working with Google Cloud Platform, Amazon AWS, Microsoft Azure, and other Cloud platforms.
  • Standard Python for Coding: Python allows you to construct a wide range of workflows, from simple to sophisticated, with complete flexibility.
  • Exceptional User Interface: You can keep track of and manage your processes. You’ll be able to see the status of completed and ongoing tasks.

To get further information on Apache Airflow, check out the official website here.

What is an Airflow DAG?

DAGs are defined as Python code in Airflow. All Python code in the dags_folder is executed, and any DAG objects that occur in globals() are loaded. The simplest approach to making a DAG is to write it in Python as a static file.

What is the difference between a Static DAG & Dynamic DAG?

However, manually writing DAGs isn’t always feasible. Perhaps you have hundreds or thousands of DAGs that all do the same thing but differ just in one parameter. Maybe you need a collection of DAGs to load tables but don’t want to update them manually every time the tables change. In these and other situations, Airflow Dynamic DAGs may make more sense.

Since everything in Airflow is code, you can construct DAGs dynamically using just Python. Airflow will load any DAG object created in globals() by Python code that lives in the dags_folder.

How is a Dynamic DAG useful?

When you have DAGs that follow a similar pattern, dynamically constructing DAGs can be useful:

  • If you want to make the transition from a legacy system to Airflow as painless as possible.
  • If there is only one parameter that changes between DAGs.
  • If you have DAGs that are reliant on a source system’s changing structure.
  • If you want to establish DAG standards throughout your team or organization.

Simplify ETL Using Hevo’s No-code Data Pipeline

Hevo Data is a No-code Data Pipeline that offers a fully managed solution to set up Data Integration for 100+ Data Sources (including 40+ Free sources) and will let you directly load data from sources to a Data Warehouse or the Destination of your choice. It will automate your data flow in minutes without writing any line of code. Its fault-tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data. 

Get Started with Hevo for Free

Let’s look at some of the salient features of Hevo:

  • Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer. 
  • Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis.
  • Schema Management: Hevo can automatically detect the schema of the incoming data and maps it to the destination schema.
  • Connectors: Hevo supports 100+ Integrations to SaaS platforms FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks; and MySQL, SQL Server, TokuDB, MongoDB, PostgreSQL Databases to name a few.  
  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Live Monitoring: Advanced monitoring gives you a one-stop view to watch all the activities that occur within Data Pipelines.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

How to Set up Dynamic DAGs in Apache Airflow?

1) Creating Airflow Dynamic DAGs using the Single File Method 

A Single Python file that generates DAGs based on some input parameter(s) is one way for generating Airflow Dynamic DAGs (e.g. a list of APIs or tables). An ETL or ELT Pipeline with several Data Sources or Destinations is a popular use case for this. This necessitates the creation of a large number of DAGs that all follow the same pattern.

The single-file technique is implemented differently in the following examples depending on which input parameters are utilized to generate Airflow Dynamic DAGs.

A) Using the Create_DAG Method

To build Airflow Dynamic DAGs from a file, you must first define a Python function that generates DAGs based on an input parameter. In this scenario, you’ll use the create_dag function to define a DAG template. The code is pretty similar to what you’d use to create a single DAG, but it’s wrapped in a method that allows you to pass in custom arguments.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag

The input parameters in this example might originate from any source that the Python script can access. You can then use a simple loop (range(1, 4) to produce these unique parameters and pass them to the global scope, allowing the Airflow Scheduler to recognize them as Valid DAGs:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag


# build a dag for each number in range(10)
for n in range(1, 4):
    dag_id = 'loop_hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2021, 1, 1)
                    }

    schedule = '@daily'
    dag_number = n

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

You can have a look at your Airflow Dashboard now:

Airflow Dynamic DAGs: Single File A
Image Source

B) Using Connections to Create a DAG

The input parameters do not require to be present in the Airflow Dynamic DAG file itself, as previously stated. Setting values in a Variable Object is another typical way to generate DAGs.

Airflow Dynamic DAGs: Single File B1
Image Source

By importing the Variable Class and passing it into our range, you can get this value. The default_var is set to 3 because you want the interpreter to register this file as valid regardless of whether the variable exists.

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag


number_of_dags = Variable.get('dag_number', default_var=3)
number_of_dags = int(number_of_dags)

for n in range(1, number_of_dags):
    dag_id = 'hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2021, 1, 1)
                    }

    schedule = '@daily'
    dag_number = n
    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

After that, you can go to the Airflow UI and see all of the newly generated Airflow Dynamic DAGs.

Airflow Dynamic DAGs: Single File B2
Image Source

C) Using Variables to Create a DAG

Airflow Connections are another approach to establish input parameters for dynamically constructing DAGs. If each of your Airflow Dynamic DAGs connects to a Database or an API, this would be a suitable solution. Creating DAGs from that source eliminates needless labor because you’ll be building up those connections regardless.

You can pull the connections you have in your Airflow metadata Database by instantiating the “Session” and querying the “Connection” table to implement this function. This query can also be filtered to only return connections that meet specified criteria.

Airflow Dynamic DAGs: Single File C1
Image Source
from airflow import DAG, settings
from airflow.models import Connection
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag


session = settings.Session()
conns = (session.query(Connection.conn_id)
                .filter(Connection.conn_id.ilike('%MY_DATABASE_CONN%'))
                .all())

for conn in conns:
    dag_id = 'connection_hello_world_{}'.format(conn[0])

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = '@daily'
    dag_number = conn

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

You’re using the Models library to bring in the Connection class, the same as before (as you did previously with the Variable class). You can also use settings to access the Session() class, which allows us to query the current Database Session.

Airflow Dynamic DAGs: Single File C2
Image Source

You can see that a unique Airflow Dynamic DAG has been formed for all of the connections that match your filter.

2) Creating Airflow Dynamic DAG using the Multiple File Method

Another way to construct Airflow Dynamic DAGs is to use code to generate complete Python files for each DAG. This method produces one Python file in your DAGs folder for each produced DAG.

A Python script that generates DAG files when run as part of a CI/CD Workflow is one way to implement this strategy in production. The DAGs are created and deployed to Airflow during the CI/CD build. Another DAG might be used to run the generation script on a regular basis.

You’ll show get a simple example of how to use this method in the section below.

A) Using the DAG Factory Tool to Create a DAG

Dag-Factory is a significant tool for building Airflow Dynamic  DAGs from the community. dag-factory is a Python library that generates Airflow Dynamic DAGs from YAML files.

You may use dag-factory to generate DAGs by installing the package in your Airflow environment and creating YAML configuration files. The DAGs can then be created using the dag-factory.generate_dags() method in a Python script, as shown in the dag-factory README:

from airflow import DAG
import dagfactory

dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

B) Using JSON Configuration Files to Create a DAG

Using a Python script to produce DAG files based on a series of JSON configuration files is one technique to construct a multiple-file method. For the sake of simplicity, let’s assume that all DAGs have the same structure: each has a single task that executes a query using the PostgresOperator. This use case could be useful for a group of analysts that need to schedule SQL queries, where the DAG is usually the same but the query and schedule change.

Make a DAG ‘template’ file that defines the structure of the DAG. We’ve added particular variables where you know the information would be dynamically created, such as dag_id, scheduletoreplace, and querytoreplace, to make this look like a standard DAG file.

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime

default_args = {'owner': 'airflow',
                'start_date': datetime(2021, 1, 1)
                }

dag = DAG(dag_id,
            schedule_interval=scheduletoreplace,
            default_args=default_args,
            catchup=False)

with dag:
    t1 = PostgresOperator(
        task_id='postgres_query',
        postgres_conn_id=connection_id
        sql=querytoreplace)

After that, you can make a dag-config folder with a JSON config file for each DAG. The above-mentioned parameters, as well as the DAG Id, Schedule Interval, and Query to be conducted, should all be defined in the config file.

{
    "DagId": "dag_file_1",
    "Schedule": "'@daily'",
    "Query":"'SELECT * FROM table1;'"
}

Finally, a Python script needs to be developed that uses the template and config files to generate DAG files. The script runs through all of the config files in the dag-config/ folder, creates a copy of the template in the dags/ folder, and overwrites the parameters in that file with the config file.

import json
import os
import shutil
import fileinput

config_filepath = 'include/dag-config/'
dag_template_filename = 'include/dag-template.py'

for filename in os.listdir(config_filepath):
    f = open(config_filepath + filename)
    config = json.load(f)
    
    new_filename = 'dags/'+config['DagId']+'.py'
    shutil.copyfile(dag_template_filename, new_filename)
    

    for line in fileinput.input(new_filename, inplace=True):
        line.replace("dag_id", "'"+config['DagId']+"'")
        line.replace("scheduletoreplace", config['Schedule'])
        line.replace("querytoreplace", config['Query'])
        print(line, end="")

This is obviously a simplistic starting example that only works provided all of the Airflow Dynamic DAGs are structured in the same way. It might, however, be expanded to include dynamic inputs for jobs, dependencies, different operators, and so on.

dags/
├── dag_file_1.py
├── dag_file_2.py
include/
├── dag-template.py
├── generate-dag-files.py
└── dag-config
    ├── dag1-config.json
    └── dag2-config.json

Single File vs Multiple Files Methods: What are the Pros & Cons? 

The Single-File technique has the following advantages:

  • It’s simple and straightforward to implement.
  • It can accept input parameters from a variety of sources.
  • Adding DAGs is virtually quick because just the input parameters need to be changed.

However, there are certain disadvantages:

  • Since a DAG file isn’t being created, your access to the code behind any given DAG is limited.
  • The generated code will be executed every time the dag is parsed because this approach requires a Python file in the dags folder. The parameter min file process interval controls how often this happens (see Airflow docs). If the total number of DAGs is enormous, or if the code connects to an external system like a database, this can cause performance concerns. See the Scalability section below for further information.

The following are some of the advantages of the Multiple File Method:

  • It’s scalable compared to single-file approaches. The DAG generating code isn’t executed on every scheduler heartbeat because the DAG files aren’t generated by parsing code in the dags folder.
  • You have full visibility into the DAG code, including via the Code button in the Airflow UI, because DAG files are expressly produced before being sent to Airflow.

However, there are some disadvantages to this method:

  • It is difficult to set up.
  • Changes to DAGs or new DAGs will not be formed until the script is run, which may necessitate a deployment in some cases.

How to Manage Scalability with Apache Airflow DAGs?

When used at scale, Airflow Dynamic DAGs might pose performance concerns. The overall amount of DAGs, Airflow configuration, and Infrastructure all influence whether or not a given technique may cause issues. Here are a few things to keep an eye out for:

  • Any code in the dags_folder will be performed every min_file_processing_interval or as quickly as the dag file processor can, whichever comes first. Methods like the single-file method, which dynamically generate DAGs, are more prone to produce performance concerns at scale.
  • If you’re using a Database to build your DAGs (for example, taking Variables from the metadata database), you’ll be querying frequently. Be aware of your database’s capabilities to manage such frequent connections, as well as any expenses you might incur from your data supplier for each request.
  • You can increase the min_file_processing_interval to a much greater amount to aid with any performance difficulties. Consider this option if you know your DAGs don’t change often and you’re willing to wait for the AIrflow Dynamic DAGs to change in response to the external source that generates them.

Creating a Static DAG in Airflow 

The majority of Airflow users are accustomed to statically defining DAGs. You make a Python file, set up your DAG, and provide your tasks.

Airflow Dynamic DAGs: Static DAG
Image Source

Conclusion

This article has given you an understanding of Apache Airflow, its key features, and how it works along with the steps to set up an Airflow Dynamic DAGs. You are now ready to start building your DAGs. In case you want to integrate Data into your desired Database/destination, then Hevo Data is the right choice for you! 

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage Data transfer between a variety of sources such as Apache Airflow and destinations with a few clicks. Hevo with its strong integration with 100+ sources & BI tools allows you to not only export Data from your desired Data sources & load it to the destination of your choice, but also transform & enrich your Data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools. 

Want to take Hevo for a spin? Sign Up for a 14-day free trial. You may also have a look at the amazing price, which will assist you in selecting the best plan for your requirements.

Share your experience of understanding the concept of Airflow Dynamic DAGs in the comment section below!

No-code Data Pipeline for Your Data Warehouse