Apache Airflow is an open-source workflow management platform for building Data Pipelines. It enables users to schedule and run Data Pipelines using the flexible Python Operators and framework. The ability to implement the pipelines allows users to streamline various business processes. Besides, the Python foundation makes extending and adding integrations with many different systems easier.

As a result, Airflow is currently used by many data-driven organizations to orchestrate a variety of crucial data activities. This article will guide you through how to install Apache Airflow in the Python environment to understand different Python Operators used in Airflow.

Prerequisites

  • Fundamental knowledge of Data Pipelines.

Introduction to Apache Airflow

Apache Airflow Logo

Apache Airflow is an open-source, batch-oriented, pipeline-building framework for developing and monitoring data workflows. In 2014, Airbnb developed Airflow to solve big data and complex Data Pipeline problems. They used a built-in web interface to write and schedule processes and monitor workflow execution. The increasing success of the Airflow project led to its adoption in the Apache Software Foundation.

Airflow enables users to efficiently build scheduled Data Pipelines utilizing some standard features of the Python framework, such as data time format for scheduling tasks. It also provides numerous building blocks that allow users to stitch together the many technologies present in today’s technological landscapes.

Another key feature of Airflow is the backfilling property; it enables users to reprocess previous data easily. This feature also allows users to recompute any dataset after modifying the code. As a real-world example, Airflow can be compared to a spider in a web: it resides in the center of your data processes, coordinating work across several distributed systems. 

Key Features of Apache Airflow

  • Extensible: Easily defines operators and extends libraries to fit the level of abstraction that suits your requirements. 
  • Dynamic: Airflow pipelines are configured as code (Python), allowing for dynamic pipeline generation. Moreover, it enables users to restart from the point of failure without restarting the entire workflow again.
  • Elegant: Airflow pipelines are explicit and straightforward. The powerful Jinja templating engine is incorporated into the core of Airflow, allowing you to parameterize your scripts. Besides, the rich scheduling semantics enables users to run pipelines at regular intervals.
  • Scalable: Airflow is a modular design that orchestrates an arbitrary number of workers via a message queue. In itself, Airflow is a general-purpose orchestration framework with a manageable set of features to learn.

Introducing Data Pipelines as a Graph

In Airflow, a DAG or a Directed Acyclic Graph is a collection of all the tasks that the users want to run is organized in such a way that the relationships and dependencies are reflected. The structure of a DAG (tasks and their dependencies) is represented as code in a Python script. 

Drawing the Data Pipeline as a graph is one method to make task relationships more apparent. Tasks are nodes in the graph, whereas directed edges represent dependencies between tasks. The direction of the edge denotes the dependency.

For example, an edge pointing from task A to task B implies that task A must be finished before task B can begin. Due to these directions in the graph edges, it is referred to as a directed graph.

Transform Your ETL Process with Hevo Data’s Python Support!

Leverage Hevo Data’s capability to perform Python transformations during ETL to streamline your workflow and enhance data integration. With Python scripting, simplify complex data processing tasks and customize your ETL pipelines effortlessly. Hevo offers:

Thousands of customers worldwide trust Hevo for their data ingestion needs. Join them and experience seamless data transformation and migration.

Get Started with Hevo for Free

Getting started with Airflow in Python Environment

Now that you have understood about Apache Airflow. In this section, you will learn how to get started with Apache Airflow in Python Environment and later in the article, you will learn more about using Airflow Python Operators. The following steps to set up Airflow with Python are listed below:

Step 1: Installing Airflow 

  • Install the apache airflow using the pip with the following command.
pip install apache-airflow
  • After installing Airflow, start it by initializing the metadatabase (a database where all Airflow state is stored).
airflow db init
  • Run the command in the terminal to start the webserver.
airflow webserver
  • Now, start the airflow scheduler.
airflow scheduler

Step 2: Inspecting the Airflow UI

  • Because the scheduler and webserver are both continuous processes that keep your terminal open, you can either run them in the background with airflow webserver or run them individually in a second terminal window. 
  • To access Airflow, browse to http://localhost:8080 and log in with the username “admin” and password “admin.”
  • A login screen will appear as shown below.
Airflow UI on Browser

Introducing Python Operators in Apache Airflow

Now the setup is ready to use Airflow with Python on your local machine. In this section, you will go through steps to use Python Operators and Run a task after creating a DAG. Once this section gets completed, you will understand Python Operators and how to create a DAG and run a task using Airflow and Python. The steps to use Python Operators Airflow are listed below.

Step 1: Importing the Libraries

Import all necessary libraries.

Importing the Libraries

Step 2: Defining DAG

Now we’ll create a DAG object and pass the dag_id, which is the name of the DAG. Make sure you haven’t already established a DAG with this name.

Add a description and schedule interval to the previously created input, and the DAG will execute after the specified time interval.

DAG

Step 3: Defining DAG Arguments

For each of the DAG, we must pass one argument dictionary. Here are some of the arguments that you can pass:

  • owner: the name of the workflow owner should be alphanumeric. It can have underscores but not spaces.
  • email: if a task fails for whatever reason, you’ll get an email.
  • start_date: start date of your workflow.
  • depends_on_past: if you run your workflow, the data depends upon the past run, then mark it as True otherwise, mark it as False.
  • retry_delay: time to wait to retry a task if any task fails.

Step 4: Defining the Python Function

We’ll now construct the Python function that will print a string with an argument, which will be utilized by the PythonOperator Airflow later.

def my_function(x):
    return x + " This is a Python function."

Step 5: Defining the Task

The task_id is passed to the PythonOperator object. Pass the name of the Python function to the python_callable and the arguments using op_kwargs parameter as dictionary and lastly, the DAG object. 

Defining the Task

Step 6: Run the DAG

Once the Airflow dashboard is refreshed, a new DAG will appear. After you click the DAG, it will begin to execute and colors will indicate the current status of the workflow. Run the workflow and wait for the dark green border to appear, indicating the task has been completed successfully.

Running the DAG

Step 7: Templating

The PythonOperator is an exception to the templating. It accepts a python_callable argument in which the runtime context may be applied, rather than the arguments that can be templated with the runtime context.

Load Data from MongoDB to BigQuery
Load Data From HubSpot to Snowflake
Load Data from Mailchimp to Redshift

Different Python Operators in Airflow

In this section, you will go through various Python Operators in Airflow that are widely used in creating, managing, and accessing the workflows. The following Python Operators in Airflow are listed below:

1) Python Operator: airflow.operators.python.PythonOperator

Python Operator: airflow.operators.python.PythonOperator

When the callable is running, the Airflow passes a set of arguments that can be used in the function. This set of kwargs corresponds to the jinja templates. 

Parameters

  • python_callable (python callable) – a reference to an object.
  • op_kwargs – a dictionary of keyword arguments that will unpack in the function. 
  • op_args – it is a list of positional arguments that will be unpacked when calling the callable.
  • templates_dict (dict[str]) – a dictionary with values (templates) that will be templated by the Airflow engine between __init__ and execute.
  • templates_exts  – a list of file extensions to resolve when processing with templated fields, such as [‘.sql’, ‘.hql’].

2) Python Operator: airflow.models.python.task

Python Operator: airflow.models.python.task

It is a deprecated function that calls @task.python and allows users to turn a python function into an Airflow task.

Alternative:

from airflow.decorators import task
@task def my_task()

3) Python Operator: airflow.operators.python.BranchPythonOperator

Python Operator: airflow.operators.python.BranchPythonOperator

Allows a workflow to “branch” or accepts to follow a path following the execution of this task.

4) Python Operator: airflow.operators.python.ShortCircuitOperator

Python Operator: airflow.operators.python.ShortCircuitOperator

It allows a workflow to continue only if a condition is true. Else, the workflow “short-circuits” and the tasks are skipped.

5) Python Operator: airflow.operators.python.PythonVirtualenvOperator

Python Operator: airflow.operators.python.PythonVirtualenvOperator

Allows users to run a function in a virtualenv that can be created and destroyed automatically. The function here must be defined using def and not as part of a class.

Parameters

  • requirements – a list of requirements specified in a pip install command.
  • use_dill (bool) – use dill to serialize the args; this allows complex types but requires users to include dill in the requirements.
  • system_site_packages (bool) – can decide whether to include system_site_packages in the virtualenv.
  • string_args (list[str]) – strings present in the global var virtualenv_string_args are available for python_callable at runtime as a list[str]. 

6) Python Operator: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator

Python Operator: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator

Conclusion 

In this article, you learnt about different Python Operators, their syntax, along the parameters. Overall, this blog is a complete walk-through guide on Python Operators in Airflow. With the strong foundation of the Python framework, Apache Airflow enables users to effortlessly schedule and run any complex Data Pipelines at regular intervals. Data Pipelines represented as DAG play an essential role in the Airflow to create flexible workflows.

Today, many companies have benefited from Apache Airflow because of its dynamic nature and flexibility. Companies need to analyze their business data stored in multiple data sources, which needs to be loaded into the Data Warehouse to get a holistic view. Hevo Data is a No-code Data Pipeline solution that helps transfer data from 150+ sources to the desired Data Warehouse. Sign up for Hevo’s 14-day free trial and experience seamless data migration.

FAQs

What is the use of Python operator in Airflow?

The Python Operator in Airflow allows you to execute Python callable functions as tasks within your DAG (Directed Acyclic Graph). It facilitates running Python code and integrates seamlessly with other operators and tasks.

What are operators in Airflow?

Operators in Airflow are templates for defining tasks in a DAG. They encapsulate a specific action to perform, such as executing a Bash command, running Python code, or interacting with external systems.

How do you pass data between operators in Airflow?

Data can be passed between operators in Airflow using XCom. Operators can push data to XCom using the xcom_push method and retrieve it in subsequent tasks with xcom_pull, enabling seamless data sharing between tasks.

Shravani Kharat
Technical Content Writer, Hevo Data

Shravani is a passionate data science enthusiast interested in exploring complex topics within the field. She excels in data integration and analysis, skillfully solving intricate problems and crafting comprehensive content tailored for data practitioners and businesses. Shravani’s analytical prowess and dedication to delivering high-quality, informative material make her a valuable asset in data science.