Users can design workflows as DAGs (Directed Acyclic Graphs) of jobs with Airflow. Airflow’s powerful User Interface makes visualizing pipelines in production, tracking progress, and resolving issues a breeze.

Writing an Airflow DAG as a Static Python file is the simplest way to do it. However, manually writing DAGs isn’t always feasible as you have hundreds or thousands of DAGs that all do the same thing but differ just in one parameter. Maybe you need a collection of DAGs to load tables but don’t want to update them manually every time the tables change. In these and other situations, Airflow Dynamic DAGs may make more sense. Since everything in Airflow is code, you can construct DAGs dynamically using just Python.

In this article, you will learn everything about Airflow Dynamic DAGs along with the process which you might want to carry out while using it with simple Python Scripts to make the process run smoothly.

What is Apache Airflow?

running airflow in docket : airflow logo

Apache Airflow is an Open-Source workflow authoring, scheduling, and monitoring application. It’s one of the most reliable systems for orchestrating processes or Pipelines that Data Engineers employ. You can quickly see the dependencies, progress, logs, code, trigger tasks, and success status of your Data Pipelines.

Airflow allows users to create workflows as DAGs (Directed Acyclic Graphs) of jobs. The sophisticated User Interface of Airflow makes it simple to visualize pipelines in production, track progress, and resolve issues as needed. It links to a variety of Data Sources and can send an email or Slack notice when a task is completed or failed. Since Airflow is distributed, scalable, and adaptable, it’s ideal for orchestrating complicated Business Logic.

Key Features of Apache Airflow

  • Robust Integrations: It will provide you with ready-to-use operators for working with Google Cloud Platform, Amazon AWS, Microsoft Azure, and other Cloud platforms.
  • Standard Python for Coding: Python allows you to construct a wide range of workflows, from simple to sophisticated, with complete flexibility.
  • Exceptional User Interface: You can keep track of and manage your processes. You’ll be able to see the status of completed and ongoing tasks.
Streamline Your Pipleines with Hevo

Setting up Airflow DAGs can be complex and time-consuming. Alternatively, use Hevo to avoid technical difficulties and simplify your data workflows. Experience the ease of automation with Hevo’s intuitive, no-code platform today!

Here’s why you should choose Hevo:

  1. Plug-and-play transformations
  2. Real-time data transfer
  3. 24/5 Live Support
Get Started with Hevo for Free

What is an Airflow DAG?

DAGs are defined as Python code in Airflow. All Python code in the dags_folder is executed, and any DAG objects that occur in globals() are loaded. The simplest approach to making a DAG is to write it in Python as a static file.

What is the difference between a Static DAG & Dynamic DAG?

  • Manual DAG Creation Challenges: Writing DAGs manually can be impractical, especially when dealing with hundreds or thousands of similar DAGs that only differ by one parameter.
  • Dynamic DAG Use Cases: Dynamic DAGs are useful for situations like loading tables where you want to avoid manual updates whenever the tables change.
  • Dynamic Construction: Since Airflow is code-based, you can create DAGs dynamically using Python.
  • Automatic Loading: Airflow will automatically load any DAG object that is created in the globals() by the Python code located in the dags_folder.

How is a Dynamic DAG useful?

When you have DAGs that follow a similar pattern, dynamically constructing DAGs can be useful:

  • If you want to make the transition from a legacy system to Airflow as painless as possible.
  • If there is only one parameter that changes between DAGs.
  • If you have DAGs that are reliant on a source system’s changing structure.
  • If you want to establish DAG standards throughout your team or organization.

How to Set up Dynamic DAGs in Apache Airflow?

1) Creating Airflow Dynamic DAGs using the Single File Method 

A Single Python file that generates DAGs based on some input parameter(s) is one way for generating Airflow Dynamic DAGs (e.g. a list of APIs or tables). An ETL or ELT Pipeline with several Data Sources or Destinations is a popular use case for this. This necessitates the creation of a large number of DAGs that all follow the same pattern.

The single-file technique is implemented differently in the following examples depending on which input parameters are utilized to generate Airflow Dynamic DAGs.

A) Using the Create_DAG Method

To build Airflow Dynamic DAGs from a file, you must first define a Python function that generates DAGs based on an input parameter. In this scenario, you’ll use the create_dag function to define a DAG template. The code is pretty similar to what you’d use to create a single DAG, but it’s wrapped in a method that allows you to pass in custom arguments.

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py,
            dag_number=dag_number)

    return dag

The input parameters in this example might originate from any source that the Python script can access. You can then use a simple loop (range(1, 4) to produce these unique parameters and pass them to the global scope, allowing the Airflow Scheduler to recognize them as Valid DAGs:

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag


# build a dag for each number in range(10)
for n in range(1, 4):
    dag_id = 'loop_hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2021, 1, 1)
                    }

    schedule = '@daily'
    dag_number = n

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

You can have a look at your Airflow Dashboard now:

Airflow Dynamic DAGs: Single File A

Integrate MySQL to Snowflake
Integrate PostgreSQL to Databricks
Integrate Google Drive to BigQuery

B) Using Connections to Create a DAG

The input parameters do not require to be present in the Airflow Dynamic DAG file itself, as previously stated. Setting values in a Variable Object is another typical way to generate DAGs.

Airflow Dynamic DAGs: Single File B1

By importing the Variable Class and passing it into our range, you can get this value. The default_var is set to 3 because you want the interpreter to register this file as valid regardless of whether the variable exists.

from airflow import DAG
from airflow.models import Variable
from airflow.operators.python_operator import PythonOperator
from datetime import datetime


def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag


number_of_dags = Variable.get('dag_number', default_var=3)
number_of_dags = int(number_of_dags)

for n in range(1, number_of_dags):
    dag_id = 'hello_world_{}'.format(str(n))

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2021, 1, 1)
                    }

    schedule = '@daily'
    dag_number = n
    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

After that, you can go to the Airflow UI and see all of the newly generated Airflow Dynamic DAGs.

Airflow Dynamic DAGs: Single File B2

C) Using Variables to Create a DAG

Airflow Connections are another approach to establish input parameters for dynamically constructing DAGs. If each of your Airflow Dynamic DAGs connects to a Database or an API, this would be a suitable solution. Creating DAGs from that source eliminates needless labor because you’ll be building up those connections regardless.

You can pull the connections you have in your Airflow metadata Database by instantiating the “Session” and querying the “Connection” table to implement this function. This query can also be filtered to only return connections that meet specified criteria.

Airflow Dynamic DAGs: Single File C1
from airflow import DAG, settings
from airflow.models import Connection
from airflow.operators.python_operator import PythonOperator
from datetime import datetime

def create_dag(dag_id,
               schedule,
               dag_number,
               default_args):

    def hello_world_py(*args):
        print('Hello World')
        print('This is DAG: {}'.format(str(dag_number)))

    dag = DAG(dag_id,
              schedule_interval=schedule,
              default_args=default_args)

    with dag:
        t1 = PythonOperator(
            task_id='hello_world',
            python_callable=hello_world_py)

    return dag


session = settings.Session()
conns = (session.query(Connection.conn_id)
                .filter(Connection.conn_id.ilike('%MY_DATABASE_CONN%'))
                .all())

for conn in conns:
    dag_id = 'connection_hello_world_{}'.format(conn[0])

    default_args = {'owner': 'airflow',
                    'start_date': datetime(2018, 1, 1)
                    }

    schedule = '@daily'
    dag_number = conn

    globals()[dag_id] = create_dag(dag_id,
                                  schedule,
                                  dag_number,
                                  default_args)

You’re using the Models library to bring in the Connection class, the same as before (as you did previously with the Variable class). You can also use settings to access the Session() class, which allows us to query the current Database Session.

Airflow Dynamic DAGs: Single File C2

You can see that a unique Airflow Dynamic DAG has been formed for all of the connections that match your filter.

2) Creating Airflow Dynamic DAG using the Multiple File Method

Another way to construct Airflow Dynamic DAGs is to use code to generate complete Python files for each DAG. This method produces one Python file in your DAGs folder for each produced DAG.

A Python script that generates DAG files when run as part of a CI/CD Workflow is one way to implement this strategy in production. The DAGs are created and deployed to Airflow during the CI/CD build. Another DAG might be used to run the generation script on a regular basis.

You’ll show get a simple example of how to use this method in the section below.

A) Using the DAG Factory Tool to Create a DAG

Dag-Factory is a significant tool for building Airflow Dynamic  DAGs from the community. dag-factory is a Python library that generates Airflow Dynamic DAGs from YAML files.

You may use dag-factory to generate DAGs by installing the package in your Airflow environment and creating YAML configuration files. The DAGs can then be created using the dag-factory.generate_dags() method in a Python script, as shown in the dag-factory README:

from airflow import DAG
import dagfactory

dag_factory = dagfactory.DagFactory("/path/to/dags/config_file.yml")

dag_factory.clean_dags(globals())
dag_factory.generate_dags(globals())

B) Using JSON Configuration Files to Create a DAG

Using a Python script to produce DAG files based on a series of JSON configuration files is one technique to construct a multiple-file method. For the sake of simplicity, let’s assume that all DAGs have the same structure: each has a single task that executes a query using the PostgresOperator. This use case could be useful for a group of analysts that need to schedule SQL queries, where the DAG is usually the same but the query and schedule change.

Make a DAG ‘template’ file that defines the structure of the DAG. We’ve added particular variables where you know the information would be dynamically created, such as dag_id, scheduletoreplace, and querytoreplace, to make this look like a standard DAG file.

from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from datetime import datetime

default_args = {'owner': 'airflow',
                'start_date': datetime(2021, 1, 1)
                }

dag = DAG(dag_id,
            schedule_interval=scheduletoreplace,
            default_args=default_args,
            catchup=False)

with dag:
    t1 = PostgresOperator(
        task_id='postgres_query',
        postgres_conn_id=connection_id
        sql=querytoreplace)

After that, you can make a dag-config folder with a JSON config file for each DAG. The above-mentioned parameters, as well as the DAG Id, Schedule Interval, and Query to be conducted, should all be defined in the config file.

{
    "DagId": "dag_file_1",
    "Schedule": "'@daily'",
    "Query":"'SELECT * FROM table1;'"
}

Finally, a Python script needs to be developed that uses the template and config files to generate DAG files. The script runs through all of the config files in the dag-config/ folder, creates a copy of the template in the dags/ folder, and overwrites the parameters in that file with the config file.

import json
import os
import shutil
import fileinput

config_filepath = 'include/dag-config/'
dag_template_filename = 'include/dag-template.py'

for filename in os.listdir(config_filepath):
    f = open(config_filepath + filename)
    config = json.load(f)
    
    new_filename = 'dags/'+config['DagId']+'.py'
    shutil.copyfile(dag_template_filename, new_filename)
    

    for line in fileinput.input(new_filename, inplace=True):
        line.replace("dag_id", "'"+config['DagId']+"'")
        line.replace("scheduletoreplace", config['Schedule'])
        line.replace("querytoreplace", config['Query'])
        print(line, end="")

This is obviously a simplistic starting example that only works provided all of the Airflow Dynamic DAGs are structured in the same way. It might, however, be expanded to include dynamic inputs for jobs, dependencies, different operators, and so on.

dags/
├── dag_file_1.py
├── dag_file_2.py
include/
├── dag-template.py
├── generate-dag-files.py
└── dag-config
    ├── dag1-config.json
    └── dag2-config.json

Single File vs Multiple Files Methods: What are the Pros & Cons? 

The Single-File technique has the following advantages:

  • It’s simple and straightforward to implement.
  • It can accept input parameters from a variety of sources.
  • Adding DAGs is virtually quick because just the input parameters need to be changed.

However, there are certain disadvantages:

  • Since a DAG file isn’t being created, your access to the code behind any given DAG is limited.
  • The generated code will be executed every time the dag is parsed because this approach requires a Python file in the dags folder. The parameter min file process interval controls how often this happens (see Airflow docs). If the total number of DAGs is enormous, or if the code connects to an external system like a database, this can cause performance concerns. See the Scalability section below for further information.

The following are some of the advantages of the Multiple File Method:

  • It’s scalable compared to single-file approaches. The DAG generating code isn’t executed on every scheduler heartbeat because the DAG files aren’t generated by parsing code in the dags folder.
  • You have full visibility into the DAG code, including via the Code button in the Airflow UI, because DAG files are expressly produced before being sent to Airflow.

However, there are some disadvantages to this method:

  • It is difficult to set up.
  • Changes to DAGs or new DAGs will not be formed until the script is run, which may necessitate a deployment in some cases.

How to Manage Scalability with Apache Airflow DAGs?

When used at scale, Airflow Dynamic DAGs can raise performance concerns, influenced by the number of DAGs, Airflow configuration, and infrastructure. Key considerations include:

  • Any code in the dags_folder is executed at the min_file_processing_interval or as quickly as the DAG file processor allows, whichever comes first. Techniques like the single-file method for dynamic DAG generation may lead to performance issues at scale.
  • If using a database to build DAGs (e.g., retrieving Variables from the metadata database), frequent querying may strain your database’s capabilities and incur costs from your data provider.
  • To mitigate performance issues, consider increasing the min_file_processing_interval. This option is beneficial if your DAGs change infrequently and you can afford a delay in updates based on the external source that generates them.

Creating a Static DAG in Airflow 

The majority of Airflow users are accustomed to statically defining DAGs. You make a Python file, set up your DAG, and provide your tasks.

Airflow Dynamic DAGs: Static DAG

Conclusion

This article has given you an understanding of Apache Airflow, its key features, and how it works along with the steps to set up an Airflow Dynamic DAGs. You are now ready to start building your DAGs. In case you want to integrate Data into your desired Database/destination, then Hevo Data is the right choice for you! 

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage Data transfer between a variety of sources such as Apache Airflow and destinations with a few clicks. Hevo with its strong integration with 150+ sources & BI tools allows you to not only export Data from your desired Data sources & load it to the destination of your choice, but also transform & enrich your Data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools. 

Want to try Hevo for a spin? Explore its 14-day free trial. You can also check out the amazing price, which will help you select the best plan for your requirements.

Share your experience of understanding the concept of Airflow Dynamic DAGs in the comment section below!

FAQs

1. How to dynamically generate dags in Airflow?

To dynamically generate DAGs in Airflow, use Python code in your dags_folder to create DAG objects based on parameters or configurations. Ensure these DAG objects are placed in the globals() dictionary so that Airflow can automatically load them.

2. Can airflow execute a dag on different servers?

Yes, Airflow can execute a DAG on different servers by leveraging its distributed architecture, where tasks can run on multiple worker nodes. This is typically achieved through a message broker like Celery or by using the KubernetesExecutor to manage tasks across a cluster of servers.

3. How can a DAG create a dynamic workflow?

A DAG can create a dynamic workflow by generating tasks and dependencies at runtime based on external parameters or data inputs. This allows the workflow to adapt to varying conditions or requirements, such as processing different datasets or executing different operations based on dynamic configurations.

Harsh Varshney
Research Analyst, Hevo Data

Harsh is a data enthusiast with over 2.5 years of experience in research analysis and software development. He is passionate about translating complex technical concepts into clear and engaging content. His expertise in data integration and infrastructure shines through his 100+ published articles, helping data practitioners solve challenges related to data engineering.