In this article, you’ll learn more about Testing Airflow DAGs. This guide will go over a few different types of tests that we would recommend to anyone running Apache Airflow in production, such as DAG validation testing, unit testing, and data and pipeline integrity testing.
One of Apache Airflow’s guiding principles is that your DAGs are defined as Python code. Because data pipelines can be treated like any other piece of code, they can be integrated into a standard Software Development Lifecycle using source control, CI/CD, and Automated Testing.
Although DAGs are entirely Python code, effectively testing them necessitates taking into account their unique structure and relationship to other code and data in your environment.
Introduction to Apache Airflow
Apache Airflow is an Open-Source Batch-Oriented Pipeline-building framework for developing and monitoring data workflows. Airbnb founded Airflow in 2014 to address big data and complex Data Pipeline issues. Using a built-in web interface, they wrote and scheduled processes as well as monitored workflow execution. Because of its growing popularity, the Apache Software Foundation adopted the Airflow project.
By leveraging some standard Python framework features, such as data time format for task scheduling, Apache Airflow enables users to efficiently build scheduled Data Pipelines. It also includes a slew of building blocks that enable users to connect the various technologies found in today’s technological landscapes.
Another useful feature of Apache Airflow is its backfilling capability, which allows users to easily reprocess previously processed data. This feature can also be used to recompute any dataset after modifying the code. Apache Airflow, like a spider in a web, sits at the heart of your data processes, coordinating work across multiple distributed systems.
To understand how to build robust pipelines using Airflow, explore Hevo’s resources.
Key Features of Apache Airflow
- Dynamic: Airflow pipelines are written in Python and can be generated dynamically. This allows for the development of code that dynamically instantiates pipelines.
- Extensible: You can easily define your operators and executors, and you can extend the library to fit the level of abstraction that works best for your environment.
- Elegant: Airflow pipelines are simple and to the point. To parameterize your scripts, the powerful Jinja templating engine, which is built into the core of Apache Airflow, is used.
- Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. The airflow is ready to continue expanding indefinitely.
Hevo Data, a No-code Data Pipeline helps to load data from any data source such as Databases, SaaS applications, Cloud Storage, SDKs, and Streaming Services and simplifies the ETL process. It supports 150+ data sources (including 60+ free data sources) like Asana and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo not only loads the data onto the desired Data Warehouse/destination but also enriches the data and transforms it into an analysis-ready form without having to write a single line of code.
Check out why Hevo is the Best:
- Secure & Reliable: Hevo’s fault-tolerant architecture ensures secure, consistent data handling with zero loss and automatic schema management.
- User-Friendly & Scalable: Hevo’s simple UI makes it easy for new users, while its horizontal scaling manages growing data volumes with minimal latency.
- Efficient Data Transfer: Hevo supports real-time, incremental data loads, optimizing bandwidth usage for both ends.
- Live Monitoring & Support: Hevo provides live data flow monitoring and 24/5 customer support via chat, email, and calls.
SIGN UP HERE FOR A 14-DAY FREE TRIAL
What do Testing Airflow DAGs Mean?
- Configuration File: The Apache Airflow Python script is a configuration file that defines the DAG’s structure as code, not where actual tasks are executed.
- Task Execution Context: Tasks run in different contexts (workers and times), meaning this script cannot be used for task-to-task communication.
- XComs for Communication: Use XComs (a more advanced feature) to enable communication between tasks.
- Testing Airflow DAGs: The definition file is for creating the DAG object, not for data processing. It should be evaluated quickly to allow the scheduler to execute it regularly.
Environment for Testing Airflow DAGs
Maintain a staging environment if possible to test the entire DAG run before deploying to production. Check that your Testing Airflow DAGs are parameterized so that you can change variables such as the output path of an S3 operation or the database used to read the configuration. Do not hard code values inside the DAG and then manually change them based on the environment.
The DAG can be parameterized using environment variables.
import os
dest = os.environ.get(
"MY_DAG_DEST_PATH",
"s3://default-target/path/"
)
Writing a DAG
It is very simple to create a new DAG in Apache Airflow. However, there are several things you must consider to ensure that the DAG run or failure does not produce unexpected results.
1. Creating a Task
Tasks in Apache Airflow should be treated similarly to database transactions. It implies that you should never complete tasks with incomplete results. As an example, at the end of a task, do not produce incomplete data in HDFS or S3.
If a task fails, Apache Airflow can retry it. As a result, the tasks should produce the same results on each re-run. Some methods for avoiding producing a different result –
- If you use INSERT during a task re-run, you may end up with duplicate rows in your database. UPSERT should be used instead.
- In a specific partition, you can read and write. Never read the most recent data available in a task. Between re-runs, the input data may be updated, resulting in different outputs. It is preferable to read the input data from a specific partition. As a partition, you can use the execution date. This partitioning method should also be used when writing data to S3/HDFS.
- The DateTime now() function in Python returns the current DateTime object. This function should never be used within a task, especially to perform the critical computation, because it produces different results on each run.
2. Deleting a Task
A task should never be deleted from Testing Airflow DAGs. When a task is deleted, the task’s historical information is removed from the Apache Airflow UI. If the tasks must be deleted, it is best to create a new DAG.
3. Communication and Variables
If you use the Kubernetes executor or the Celery executor, Airflow will execute DAG tasks on multiple servers. As a result, you should not save any files or configurations to the local filesystem because the next task will most likely run on a different server without access to it.
Use XCom to communicate small messages between tasks if possible, and a remote storage system such as S3/HDFS is a good way to pass larger data between tasks. For example, if a task store processed data in S3, that task can push the S3 path for the output data in Xcom, and downstream tasks can pull the path from XCom and use it to read the data.
If possible, avoid using Variables outside of an operator’s execute() method or Jinja templates, as Variables create a connection to Airflow’s metadata DB to fetch the value, which can slow down parsing and place additional load on the DB.
{{ var.value.<variable_name> }}
At a given time, Airflow parses all of the DAGs in the background. The default period is set using the processor poll interval config, which is set to 1 second by default. Airflow creates a new connection to the metadata DB for each DAG during parsing. It can result in a large number of open connections.
Types of Testing Airflow DAGs
DAGs should be treated as production-level code by Apache airflow users. DAGs should be subjected to a variety of tests to ensure that they produce the expected results. A DAG can be tested in a variety of ways. Let’s look at a few of them.
1) Testing Airflow DAGs: DAG Loader Test
DAG validation tests are designed to ensure that your DAG objects are defined correctly, acyclic, and free from import errors.
These are the kinds of things you’d notice if you started with local DAG development. However, if you don’t have access to a local Apache Airflow environment or want to add an extra layer of security, these tests can ensure that simple coding errors don’t get deployed and slow down your development.
DAG validation tests apply to all DAGs in your Apache Airflow environment, so only one test suite is required.
Simply run the Python file to see if your DAG can be loaded, indicating that there are no syntax errors.
python your-dag-file.py
2) Testing Airflow DAGs: Unit Test
Unit testing is a method of Software Testing in which small chunks of source code are tested individually to ensure they work as expected. The goal is to isolate testable logic within small, well-named functions, such as:
def test_function_returns_5():
assert my_function(input) == 5
In the context of Apache Airflow, you can write unit tests for any part of your DAG, but hooks and operators are the most commonly used. Before merging the code into the project, all official Apache Airflow hooks, operators, and provider packages must pass unit tests. Consider the AWS S3Hook, which includes a plethora of unit tests.
3) Testing Airflow DAGs: Self Check
You can also use checks in a DAG to ensure that tasks are producing the expected results. For instance, if you have a task that pushes data to S3, you can include a check in the next task. For example, the check could ensure that the partition is created in S3 and perform some simple checks to determine whether or not the data is correct.
Similarly, if you have a task in Kubernetes or Mesos that starts a microservice, you should use airflow.sensors.http sensor.HttpSensor to determine whether or not the service has started.
task = PushToS3(...)
check = S3KeySensor(
task_id='check_parquet_exists',
bucket_key="s3://bucket/key/foo.parquet",
poke_interval=0,
timeout=0
)
task >> check
Conclusion
You learned about Testing Airflow DAGs, types, and their description in this article. The strong Python framework foundation of Apache Airflow enables users to easily schedule and run any complex Data Pipelines at regular intervals. Data Pipelines, denoted as DAG in Airflow, are essential for creating flexible workflows.
Apache Airflow’s rich web interface allows you to easily monitor pipeline run results and debug any failures that occur. Because of its dynamic nature and flexibility, Apache Airflow has benefited many businesses today.
Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 150+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.
Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.
Share your experience on this comprehensive Guide for Testing Airflow DAGs in the comments section below!
FAQs
1. How to test an Airflow DAG?
To test an Airflow DAG, you can use the airflow dags test command, which runs a single task or the entire DAG for a specific date without triggering the scheduler. This allows you to check if tasks are executed correctly and debug issues in your DAG.
2. How to test for Airflow?
To test Airflow, you can use the airflow dags test command to run individual tasks or the whole DAG for a specific date, ensuring tasks execute as expected. Additionally, you can use unit tests with a framework like pytest and Airflow’s testing utilities to validate your DAG’s logic and task behavior in isolation.
3. How do I check my Airflow DAG logs?
To check your Airflow DAG logs, you can use the Airflow UI by navigating to the specific DAG run and selecting the task instance to view detailed logs. Alternatively, you can find logs in the logs/ directory of your Airflow home or use the command airflow tasks logs <dag_id> <task_id> <execution_date>.
4. How to test Airflow on local?
To test Airflow locally, you can use Docker to set up an Airflow instance or install Airflow directly on your machine. After setting it up, you can start the web server and scheduler using airflow webserver and airflow scheduler, and then test your DAGs using the Airflow UI or the airflow dags test command.
Davor DSouza is a data analyst with a passion for using data to solve real-world problems. His experience with data integration and infrastructure, combined with his Master's in Machine Learning, equips him to bridge the gap between theory and practical application. He enjoys diving deep into data and emerging with clear and actionable insights.