A Comprehensive Guide for Testing Airflow DAGs 101

By: Published: February 10, 2022

In this article, you’ll learn more about Testing Airflow DAGs. This guide will go over a few different types of tests that we would recommend to anyone running Apache Airflow in production, such as DAG validation testing, unit testing, and data and pipeline integrity testing.

One of Apache Airflow’s guiding principles is that your DAGs are defined as Python code. Because data pipelines can be treated like any other piece of code, they can be integrated into a standard Software Development Lifecycle using source control, CI/CD, and Automated Testing.

Although DAGs are entirely Python code, effectively testing them necessitates taking into account their unique structure and relationship to other code and data in your environment.

Table of Contents

Introduction to Apache Airflow

Testing Airflow DAGs - Apache Airflow logo
Image Source

Apache Airflow is an Open-Source Batch-Oriented Pipeline-building framework for developing and monitoring data workflows. Airbnb founded Airflow in 2014 to address big data and complex Data Pipeline issues. Using a built-in web interface, they wrote and scheduled processes as well as monitored workflow execution. Because of its growing popularity, the Apache Software Foundation adopted the Airflow project.

By leveraging some standard Python framework features, such as data time format for task scheduling, Apache Airflow enables users to efficiently build scheduled Data Pipelines. It also includes a slew of building blocks that enable users to connect the various technologies found in today’s technological landscapes.

Another useful feature of Apache Airflow is its backfilling capability, which allows users to easily reprocess previously processed data. This feature can also be used to recompute any dataset after modifying the code. Apache Airflow, like a spider in a web, sits at the heart of your data processes, coordinating work across multiple distributed systems.

Key Features of Apache Airflow

  • Dynamic:  Airflow pipelines are written in Python and can be generated dynamically. This allows for the development of code that dynamically instantiates pipelines.
  • Extensible: You can easily define your operators and executors, and you can extend the library to fit the level of abstraction that works best for your environment.
  • Elegant: Airflow pipelines are simple and to the point. To parameterize your scripts, the powerful Jinja templating engine, which is built into the core of Apache Airflow, is used.
  • Scalable: Airflow has a modular architecture and uses a message queue to orchestrate an arbitrary number of workers. The airflow is ready to continue expanding indefinitely.
Simplify Data Analysis with Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline helps to load data from any data source such as Databases, SaaS applications, Cloud Storage, SDKs, and Streaming Services and simplifies the ETL process. It supports 100+ data sources (including 30+ free data sources) like Asana and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo not only loads the data onto the desired Data Warehouse/destination but also enriches the data and transforms it into an analysis-ready form without having to write a single line of code.

GET STARTED WITH HEVO FOR FREE[/hevoButton]

Its completely automated pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensure that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different BI tools as well.

Check out why Hevo is the Best:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
  • Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
SIGN UP HERE FOR A 14-DAY FREE TRIAL

What do Testing Airflow DAGs Mean?

Testing Airflow DAGs - Apache Airflow DAGs
Image Source

One thing to keep in mind (it may not be obvious at first) is that this Apache Airflow Python script is just a configuration file that specifies the DAG’s structure as code. The actual tasks defined here will be executed in a context distinct from the context of this script. Because different tasks run on different workers at different times, this script cannot be used to cross-communicate between tasks. It’s worth noting that we have a more advanced feature called XComs for this purpose.

People mistakenly believe that the Testing Airflow DAGs definition file is a place where they can do actual data processing; however, this is not the case! The script’s goal is to create a DAG object. It must evaluate quickly (seconds, not minutes) because the scheduler will execute it regularly to reflect any changes.

Environment for Testing Airflow DAGs

Testing Airflow DAGs - Apache Airflow logo
Image Source

Maintain a staging environment if possible to test the entire DAG run before deploying to production. Check that your Testing Airflow DAGs are parameterized so that you can change variables such as the output path of an S3 operation or the database used to read the configuration. Do not hard code values inside the DAG and then manually change them based on the environment.

The DAG can be parameterized using environment variables.

import os

dest = os.environ.get(
   "MY_DAG_DEST_PATH",
   "s3://default-target/path/"
)

Writing a DAG

It is very simple to create a new DAG in Apache Airflow. However, there are several things you must consider to ensure that the DAG run or failure does not produce unexpected results.

Creating a Task

Tasks in Apache Airflow should be treated similarly to database transactions. It implies that you should never complete tasks with incomplete results. As an example, at the end of a task, do not produce incomplete data in HDFS or S3.

If a task fails, Apache Airflow can retry it. As a result, the tasks should produce the same results on each re-run. Some methods for avoiding producing a different result –

  • If you use INSERT during a task re-run, you may end up with duplicate rows in your database. UPSERT should be used instead.
  • In a specific partition, you can read and write. Never read the most recent data available in a task. Between re-runs, the input data may be updated, resulting in different outputs. It is preferable to read the input data from a specific partition. As a partition, you can use the execution date. This partitioning method should also be used when writing data to S3/HDFS.
  • The DateTime now() function in Python returns the current DateTime object. This function should never be used within a task, especially to perform the critical computation, because it produces different results on each run.

Deleting a Task

A task should never be deleted from Testing Airflow DAGs. When a task is deleted, the task’s historical information is removed from the Apache Airflow UI. If the tasks must be deleted, it is best to create a new DAG.

Communication and Variables

If you use the Kubernetes executor or the Celery executor, Airflow will execute DAG tasks on multiple servers. As a result, you should not save any files or configurations to the local filesystem because the next task will most likely run on a different server without access to it.

Use XCom to communicate small messages between tasks if possible, and a remote storage system such as S3/HDFS is a good way to pass larger data between tasks. For example, if a task store processed data in S3, that task can push the S3 path for the output data in Xcom, and downstream tasks can pull the path from XCom and use it to read the data.

If possible, avoid using Variables outside of an operator’s execute() method or Jinja templates, as Variables create a connection to Airflow’s metadata DB to fetch the value, which can slow down parsing and place additional load on the DB.

{{ var.value.<variable_name> }}

At a given time, Airflow parses all of the DAGs in the background. The default period is set using the processor poll interval config, which is set to 1 second by default. Airflow creates a new connection to the metadata DB for each DAG during parsing. It can result in a large number of open connections.

Types of Testing Airflow DAGs

DAGs should be treated as production-level code by Apache airflow users. DAGs should be subjected to a variety of tests to ensure that they produce the expected results. A DAG can be tested in a variety of ways. Let’s look at a few of them.

Testing Airflow DAGs: DAG Loader Test

DAG validation tests are designed to ensure that your DAG objects are defined correctly, acyclic, and free from import errors.

These are the kinds of things you’d notice if you started with local DAG development. However, if you don’t have access to a local Apache Airflow environment or want to add an extra layer of security, these tests can ensure that simple coding errors don’t get deployed and slow down your development.

DAG validation tests apply to all DAGs in your Apache Airflow environment, so only one test suite is required.

Simply run the Python file to see if your DAG can be loaded, indicating that there are no syntax errors.

python your-dag-file.py

Testing Airflow DAGs: Unit Test

Unit testing is a method of Software Testing in which small chunks of source code are tested individually to ensure they work as expected. The goal is to isolate testable logic within small, well-named functions, such as:

def test_function_returns_5():
	assert my_function(input) == 5

In the context of Apache Airflow, you can write unit tests for any part of your DAG, but hooks and operators are the most commonly used. Before merging the code into the project, all official Apache Airflow hooks, operators, and provider packages must pass unit tests. Consider the AWS S3Hook, which includes a plethora of unit tests.

Testing Airflow DAGs: Self Check

You can also use checks in a DAG to ensure that tasks are producing the expected results. For instance, if you have a task that pushes data to S3, you can include a check in the next task. For example, the check could ensure that the partition is created in S3 and perform some simple checks to determine whether or not the data is correct.

Similarly, if you have a task in Kubernetes or Mesos that starts a microservice, you should use airflow.sensors.http sensor.HttpSensor to determine whether or not the service has started.

task = PushToS3(...)
check = S3KeySensor(
   task_id='check_parquet_exists',
   bucket_key="s3://bucket/key/foo.parquet",
   poke_interval=0,
   timeout=0
)
task >> check

Conclusion

You learned about Testing Airflow DAGs, types, and their description in this article. The strong Python framework foundation of Apache Airflow enables users to easily schedule and run any complex Data Pipelines at regular intervals. Data Pipelines, denoted as DAG in Airflow, are essential for creating flexible workflows.

Apache Airflow’s rich web interface allows you to easily monitor pipeline run results and debug any failures that occur. Because of its dynamic nature and flexibility, Apache Airflow has benefited many businesses today.

Visit our Website to Explore Hevo

Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 100+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.

Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.

Share your experience on this comprehensive Guide for Testing Airflow DAGs in the comments section below!

mm
Former Research Analyst, Hevo Data

Davor is a data analyst at heart with a passion for data, software architecture, and writing technical content. He has experience writing more than 100 articles on data integration and infrastructure.

No-code Data Pipeline for Your Data Warehouse