Apache Airflow Tasks: The Ultimate Guide for 2022

on Airflow Webserver, Apache Airflow • February 16th, 2022 • Write for Hevo

Airflow Tasks FI

Apache Airflow is a popular open-source workflow management tool. It allows you to develop workflows using normal Python, allowing anyone with a basic understanding of Python to deploy a workflow. This is a step forward from previous platforms that rely on the Command Line or XML to deploy workflows. Airflow has a number of simple operators that let you run your processes on cloud platforms such as AWS, GCP, Azure, and others. Airflow orchestrates the workflow using Directed Acyclic Graphs (DAGs). External triggers or a schedule can be used to run DAGs (hourly, daily, etc.). The tasks are written in Python, and Airflow handles the execution and scheduling.

In Airflow, a Task is the most basic unit of execution. Tasks are organized into DAGs, and upstream and downstream dependencies are established between them to define the order in which they should be executed.

In this article, you will get to know everything about Airflow Tasks and understand the important terms and mechanisms related to the Airflow Tasks.

Table of Contents

What is Apache Airflow?

Airflow Tasks: logo
Image Source

Apache Airflow is an Open-Source process automation and scheduling tool for authoring, scheduling, and monitoring workflows programmatically. Airflow is used to organize complicated computational operations, establish Data Processing Pipelines, and perform ETL processes in organizations. The workflow is built with Apache Airflow’s DAG (Directed Acyclic Graph), which has nodes and connectors. A Dependency Tree is created by connecting nodes with connectors.

Key Features of Apache Airflow

  • Dynamic Integration: Airflow generates dynamic pipelines using Python as the backend programming language. There are several operators, hooks, and connectors that may be used to generate DAG and connect them to form processes.
  • Versatile: Since Airflow is an Open-source platform, users can create their own unique Operators, Executors, and Hooks. You can also customize the libraries to fit the level of abstraction that best suits your needs.
  • User Interface: Airflow creates pipelines using Jinja templates, which results in pipelines that are lean and explicit. In Airflow, parameterizing your scripts is a simple process.
  • Scalable: Airflow has been built to scale indefinitely. You are free to create as many dependent workflows as you like. To orchestrate an arbitrary number of workers, Airflow generates a message queue.

To get further information on Apache Airflow, check out the official website here.

What is an Apache Airflow Task?

In Airflow, a Task is the most basic unit of execution. Tasks are organized into DAGs, and upstream and downstream dependencies are established between them to define the order in which they should be executed.

There are three types of tasks:

  • Operators are preconfigured task templates that you can quickly string together to form a majority of your DAGs.
  • Sensors are a subtype of Operators that are solely concerned with waiting for an external event to occur.
  • TaskFlow-decorated custom Python functions are packaged as a Task.

Internally, these are all subclasses of Airflow’s BaseOperator, and the ideas of Task and Operator are somewhat interchangeable, but it’s better to think of them as distinct concepts effectively, Operators and Sensors are templates, and calling one in a DAG file creates a Task.

What are the Key Types of Airflow Tasks?

DAGs are made up of several tasks. To define jobs in Airflow, we use Operators and Sensors (which are also a sort of operator). An operator is referred to as a job of the DAG once it has been instantiated within a DAG.

1) Operators

An Operator usually integrates with another service, such as MySQLOperator, SlackOperator, PrestoOperator, and so on, allowing Airflow to access these services. Users can utilize QuboleOperator to run Presto, Hive, Hadoop, Spark, Zeppelin Notebooks, Jupyter Notebooks, and Data Import/Export for their Qubole account.

The following are some of the most frequent Airflow Operators:

  • BashOperator: It is a program that runs bash commands on the machine it is installed on.
  • PythonOperator: It accepts any Python function as an argument and executes it (this means the function should have a specific signature as well).
  • EmailOperator: It sends emails through an SMTP server that has been configured.
  • SimpleHttpOperator:  It sends an HTTP request to a remote system that can be used to perform activities.
  • QuobleOperator: On the configured Qubole account, users can run and retrieve results from Presto, Hive, Hadoop, Spark Commands, Zeppelin Notebooks, Jupyter Notebooks, and Data Import / Export Jobs.
  • SQL operators include MySqlOperator, SqliteOperator, PostgresOperator, MsSqlOperator, OracleOperator, JdbcOperator, and others.

2) Sensors

Sensors are unique operators that are designed to wait for an External or Internal Trigger. These are typically used to initiate any or all of the DAG in response to an external event. The following are examples of common Sensor types:

  • ExternalTaskSensor: It awaits the completion of another job (in a different DAG).
  • HivePartitionSensor: This sensor waits for a certain value of a hive table partition to be generated.
  • S3KeySensor: S3 Key Sensors are used to monitor the availability of a certain file or directory on an S3 bucket.

3) Taskflow

If you build the majority of your DAGs with plain Python code rather than Operators, the TaskFlow API will make it much easier to clean DAGs with minimal boilerplate, all while utilizing the @task decorator.

When you call a TaskFlow function in your DAG file instead of executing it, you’ll get an object representing the XCom for the outcome (an XComArg), which you may then use as inputs to Downstream Tasks or Operators. For example: 

from airflow.decorators import task
from airflow.operators.email import EmailOperator

@task
def get_ip():
    return my_ip_service.get_main_ip()

@task
def compose_email(external_ip):
    return {
        'subject':f'Server connected from {external_ip}',
        'body': f'Your server executing Airflow is connected from the external IP {external_ip}<br>'
    }

email_info = compose_email(get_ip())

EmailOperator(
    task_id='send_email',
    to='example@example.com',
    subject=email_info['subject'],
    html_content=email_info['body']
)

4) Hooks

Hooks connect to services outside of the Airflow Cluster. Hooks give a uniform interface to access external services like S3, MySQL, Hive, Qubole, and others, whereas Operators provide a method to define tasks that may or may not communicate with some external service. Hooks are the components that allow Operators to communicate with External Services.

Simplify ETL Using Hevo’s No-code Data Pipeline

Hevo Data is a No-code Data Pipeline that offers a fully managed solution to set up Data Integration for 100+ Data Sources (including 40+ Free sources) and will let you directly load data from sources to a Data Warehouse or the Destination of your choice. It will automate your data flow in minutes without writing any line of code. Its fault-tolerant architecture makes sure that your data is secure and consistent. Hevo provides you with a truly efficient and fully automated solution to manage data in real-time and always have analysis-ready data. 

Get Started with Hevo for Free

Let’s look at some of the salient features of Hevo:

  • Fully Managed: It requires no management and maintenance as Hevo is a fully automated platform.
  • Data Transformation: It provides a simple interface to perfect, modify, and enrich the data you want to transfer. 
  • Real-Time: Hevo offers real-time data migration. So, your data is always ready for analysis.
  • Schema Management: Hevo can automatically detect the schema of the incoming data and map it to the destination schema.
  • Connectors: Hevo supports 100+ Integrations to SaaS platforms FTP/SFTP, Files, Databases, BI tools, and Native REST API & Webhooks Connectors. It supports various destinations including Google BigQuery, Amazon Redshift, Snowflake, Firebolt, Data Warehouses; Amazon S3 Data Lakes; Databricks; MySQL, SQL Server, TokuDB, MongoDB, PostgreSQL Databases to name a few.  
  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Live Monitoring: Advanced monitoring gives you a one-stop view to watch all the activities that occur within Data Pipelines.
  • Live Support: Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
Sign up here for a 14-Day Free Trial!

What are Task Relationships in Apache Airflow?

There are a variety of techniques to connect Airflow Tasks in a DAG. Here’s a rundown of all the techniques; when you need to establish a relationship while keeping your code clean and understandable, it’s recommended to use Bitshift and Relationship Builders.

There are two ways to set basic dependencies between Airflow Tasks:

  • Bitshift operators (and >>) are used.
  • Set Upstream and set Downstream functions to create a stream.

If you have a DAG with four consecutive jobs, you may set the dependencies in four different methods.

1) Using set_downstream():

t0.set_downstream(t1)
t1.set_downstream(t2)
t2.set_downstream(t3)

2) Using set_upstream():

t3.set_upstream(t2)
t2.set_upstream(t1)
t1.set_upstream(t0)

3) Using >>:

t0 >> t1 >> t2 >> t3

4) Using <<:

t3 << t2 << t1 << t0

Relationship Builder:

The Chain and Cross Downstream functions make it simpler to establish relationships between operators in a given context. Listed below are a few examples:

#  the last example demonstrated in bitshift composition can easily be replaced as this
cross_downstream([task1, task2, task3], [task4, task5, task6])
# also conditions like task1 >> task2 >> task3 >> task4 >> task5 can be replaced with:
chain(task1, task2, task3, task4, task5)

Understanding the Relationship Terminology for Airflow Tasks

There are two types of relationships that a Task Instance has with other Task Instances.

For starters, it can perform both Upstream and Downstream Tasks:

task1 >> task2 >> task3

When a DAG runs, it creates Upstream/Downstream instances for each of these Tasks, but they all have the same data interval.

There may be multiple instances of the same task, but with different data intervals, from various DAG runs. These are referred to as Previous and Next, as opposed to Upstream and Downstream.

What are Apache Airflow Tasks Instances? 

Airflow Tasks Instances
Image Source

The jobs in a DAG are instantiated into Task Instances in the same way that a DAG is instantiated into a DAG Run each time it runs.

A Task Instance is a specific run of that task for a certain DAG (and thus for a given Data Interval). They’re also a representation of a Task with a state that indicates where it is in the lifecycle.

A Task Instance can be in any of the following states:

  • none: No task has been queued for execution yet (its dependencies are not yet met.
  • scheduled: The scheduler has concluded that the Task’s dependencies have been met and that it should run.
  • queued: An Executor has been assigned to the task, and it is awaiting a worker.
  • running: The task is now being executed on a worker (or a local/synchronous executor).
  • success: The task was completed successfully and without faults.
  • shutdown: When the task was running, it was requested to shut down from the outside.
  • restarting: When the job was running, an external request was made for it to restart.
  • failed: The task encountered an error during execution and was unable to complete.
  • skipped: The task was skipped because of branching, LatestOnly, or something similar.
  • upstream_failed: An upstream job failed, despite the fact that the Trigger Rule stated that it was required.
  • up_for_retry: The task failed, but there are still retries available, therefore it will be rescheduled.
  • up_for_reschedule: A Sensor in rescheduling mode is the task.’
  • sensing: The task is to use a Smart Sensor.
  • deferred: The task has been postponed until a trigger is found.
  • removed: Since the run began, the task has vanished from the DAG.

Airflow Tasks should ideally progress from none to Scheduled, Queued, Running, and finally Success.

Any Custom Task (Operator) will receive a copy of the Task Instance supplied to it when it runs, it has methods for things like XComs as well as the ability to inspect task metadata.

How do Timeouts work with Airflow Tasks? 

Set the execution_timeout attribute of a task to a DateTime.timedelta number that is the maximum allowable runtime if you want it to have a maximum runtime. All Airflow tasks, including sensors, fall under this category. The maximum time permitted for each execution is controlled by execution_timeout. The task times out and AirflowTaskTimeout is raised if execution_timeout is exceeded.

A timeout option is also available for sensors. Only sensors in rescheduling mode are affected. The maximum time permitted for the sensor to succeed is controlled by timeout. If the timeout is exceeded, the AirflowSensorTimeout is increased, and the sensor fails without retrying.

This is demonstrated in the SFTPSensor example below. The sensor is in reschedule mode, meaning it is periodically executed and rescheduled until it succeeds.

  • The sensor is only permitted to poke the SFTP server once every 60 seconds, as determined by execution_time.
  • AirflowTaskTimeout will be raised if the sensor takes more than 6 ki0 seconds to poke the SFTP server. When this happens, the sensor is allowed to attempt again. It can retry up to two times, depending on the retries setting. The sensor is given a maximum of 3600 seconds from the start of the first execution until it succeeds (i.e. after the file ‘root/test’ appears), as defined by timeout. In other words, the sensor will raise AirflowSensorTimeout if the file does not present on the SFTP server within 3600 seconds. When this problem occurs, it will not retry.
  • If the sensor fails for any reason during the 3600 seconds interval, such as network interruptions, it can retry up to two times as defined by retries. The timeout is not reset by retrying. It will still have up to 3600 seconds to succeed in total.
sensor = SFTPSensor(
    task_id="sensor",
    path="/root/test",
    execution_timeout=timedelta(seconds=60),
    timeout=3600,
    retries=2,
    mode="reschedule",
)

SLAs are what you want if you just want to be notified if a task goes over time but still want it to finish.

How to Setup the Executor Configuration for Airflow Tasks? 

Some Executors, such as the KubernetesExecutor, enable optional per-task configuration, such as setting an image to run the task on.

The executor_config argument to a Task or Operator is used to accomplish this. Here’s an example of how to configure a Docker image for a KubernetesExecutor task:

MyOperator(...,
    executor_config={
        "KubernetesExecutor":
            {"image": "myCustomDockerImage"}
    }
)

The options you can send into executor_config differ for each executor, so check the documentation for each one to see what you can do.

What is the XCom Mechanism for Airflow Tasks? 

XComs (short for “cross-communications”) is a technique that allows Tasks to communicate with one another, while Tasks are often segregated and executed on distinct machines.

A key (basically its name), as well as the task_id and dag_id from whence it came, are used to identify an XCom. They can have any (serializable) value, but they are only intended for little quantities of data; they should not be used to send around huge values, such as dataframes.

The xcom_push and xcom_pull methods on Task Instances are used to explicitly “push” and “pull” XComs to and from their storage. If the do xcom_push parameter is set to True (as it is by default), many operators and @task functions will auto-push their results into the XCom key called return_value.

If no key is supplied to xcom_pull, it will use this key by default, allowing you to write code like this:

# Pulls the return_value XCOM from "pushing_task"
value = task_instance.xcom_pull(task_ids='pushing_task')

XComs can also be used in templates:

SELECT * FROM {{ task_instance.xcom_pull(task_ids='foo', key='table_name') }}

The key distinction between XComs and Variables is that XComs are per-task-instance and meant for communication inside a DAG run, whereas Variables are global and designed for overall configuration and value exchange.

Key Exceptions for Apache Airflow Tasks

Airflow supports two unique exceptions you can raise if you want to control the state of your Airflow Tasks from within custom Task/Operator code:

  • The current job will be marked as skipped if AirflowSkipException is thrown.
  • The current task will be marked as failed, and all remaining retries will be ignored by AirflowFailException.

These are handy if your code has more knowledge about its environment and needs to fail/skip quickly. For example, skipping when no data is available or fast-falling when its API key is invalid (as that will not be fixed by a retry).

What are Undead or Zombie Tasks in Airflow?

There is no such thing as a faultless system, and task instances are expected to die from time to time. There are two types of Task/Process mismatches that Airflow can detect:

  • Tasks that were scheduled to be running but died unexpectedly are known as Zombie Tasks (e.g. their process was killed, or the machine died). Airflow will detect them on a regular basis, clear them up, and then either fail or retry the task, depending on the parameters.
  • Undead Tasks are tasks that are intended to be running but aren’t, which is frequently the result of manually editing Task Instances via the UI. Periodically, airflow will locate them and extinguish them.

Conclusion

This article has given you an understanding of Apache Airflow, its key features with a deep understanding of Airflow Tasks. You are now ready to start building your DAGs. In case you want to integrate Data into your desired Database/destination, then Hevo Data is the right choice for you! 

Visit our Website to Explore Hevo

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage Data transfer between a variety of sources and destinations with a few clicks. Hevo with its strong integration with 100+ sources & BI tools allows you to not only export Data from your desired Data sources & load it to the destination of your choice, but also transform & enrich your Data to make it analysis-ready so that you can focus on your key business needs and perform insightful analysis using BI tools. 

While Airflow is a good solution for Data Integration, It requires a lot of Engineering Bandwidth & Expertise. This can be challenging, resource-intensive & costly in the long run. Hevo offers a much simpler, scalable, and economical solution that allows people to create Data Pipeline without any code in minutes & without depending on Engineering teams

Want to take Hevo for a spin? Sign Up for a 14-day free trial. You may also have a look at the amazing price, which will assist you in selecting the best plan for your requirements.

Share your experience of understanding the concept of Airflow Tasks in the comment section below!

No-code Data Pipeline for Your Data Warehouse