Understanding Python Operator in Airflow Simplified 101

|

Python Operator

Apache Airflow is an open-source workflow management platform for building Data Pipelines. It enables users to schedule and run Data Pipelines using the flexible Python Operators and framework. The ability to implement the pipelines allows users to streamline various business processes. Besides, the Python foundation makes extending and adding integrations with many different systems easier.

As a result, Airflow is currently used by many data-driven organizations to orchestrate a variety of crucial data activities. This article will guide you through how to install Apache Airflow in the Python environment to understand different Python Operators used in Airflow.

Table of Contents

Prerequisites

  • Fundamental knowledge of Data Pipelines.

Introduction to Apache Airflow

Apache Airflow Logo
Image Credit

Apache Airflow is an open-source, batch-oriented, pipeline-building framework for developing and monitoring data workflows. In 2014, Airbnb developed Airflow to solve big data and complex Data Pipeline problems. They used a built-in web interface to write and schedule processes and monitor workflow execution. The increasing success of the Airflow project led to its adoption in the Apache Software Foundation.

Airflow enables users to efficiently build scheduled Data Pipelines utilizing some standard features of the Python framework, such as data time format for scheduling tasks. It also provides numerous building blocks that allow users to stitch together the many technologies present in today’s technological landscapes.

Another key feature of Airflow is the backfilling property; it enables users to reprocess previous data easily. This feature also allows users to recompute any dataset after modifying the code. As a real-world example, Airflow can be compared to a spider in a web: it resides in the center of your data processes, coordinating work across several distributed systems. 

Key Features of Apache Airflow

  • Extensible: Easily defines operators and extends libraries to fit the level of abstraction that suits your requirements. 
  • Dynamic: Airflow pipelines are configured as code (Python), allowing for dynamic pipeline generation. Moreover, it enables users to restart from the point of failure without restarting the entire workflow again.
  • Elegant: Airflow pipelines are explicit and straightforward. The powerful Jinja templating engine is incorporated into the core of Airflow, allowing you to parameterize your scripts. Besides, the rich scheduling semantics enables users to run pipelines at regular intervals.
  • Scalable: Airflow is a modular design that orchestrates an arbitrary number of workers via a message queue. In itself, Airflow is a general-purpose orchestration framework with a manageable set of features to learn.

To know more about Apache Airflow, click here.

Simplify Data Analysis with Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline helps to load data from any data source such as Databases, SaaS applications, Cloud Storage, SDK,s, and Streaming Services and simplifies the ETL process. It supports 100+ data sources (including 30+ free data sources) and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo not only loads the data onto the desired Data Warehouse/destination but also enriches the data and transforms it into an analysis-ready form without having to write a single line of code.

Get Started with Hevo for Free

Its completely automated pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensures that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different BI tools as well.

Check out why Hevo is the Best:

  1. Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  2. Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  3. Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  4. Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  5. Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  6. Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, E-Mail, and support calls.
  7. Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

Introducing Data Pipelines as a Graph

In Airflow, a DAG or a Directed Acyclic Graph is a collection of all the tasks that the users want to run is organized in such a way that the relationships and dependencies are reflected. The structure of a DAG (tasks and their dependencies) is represented as code in a Python script. 

Drawing the Data Pipeline as a graph is one method to make task relationships more apparent. Tasks are nodes in the graph, whereas directed edges represent dependencies between tasks. The direction of the edge denotes the dependency.

For example, an edge pointing from task A to task B implies that task A must be finished before task B can begin. Due to these directions in the graph edges, it is referred to as a directed graph.

Getting started with Airflow in Python Environment

Now that you have understood about Apache Airflow. In this section, you will learn how to get started with Apache Airflow in Python Environment and later in the article, you will learn more about using Airflow Python Operators. The following steps to set up Airflow with Python are listed below:

Step 1: Installing Airflow 

  • Install the apache airflow using the pip with the following command.
pip install apache-airflow
  • After installing Airflow, start it by initializing the metadatabase (a database where all Airflow state is stored).
airflow db init
  • Run the command in the terminal to start the webserver.
airflow webserver
  • Now, start the airflow scheduler.
airflow scheduler

Step 2: Inspecting the Airflow UI

  • Because the scheduler and webserver are both continuous processes that keep your terminal open, you can either run them in the background with airflow webserver or run them individually in a second terminal window. 
  • To access Airflow, browse to http://localhost:8080 and log in with the username “admin” and password “admin.”
  • A login screen will appear as shown below.
Airflow UI on Browser
Image Credit

Introducing Python Operators in Apache Airflow

Now the setup is ready to use Airflow with Python on your local machine. In this section, you will go through steps to use Python Operators and Run a task after creating a DAG. Once this section gets completed, you will understand Python Operators and how to create a DAG and run a task using Airflow and Python. The steps to use Python Operators Airflow are listed below.

Step 1: Importing the Libraries

Import all necessary libraries.

Importing the Libraries
Image Source: Self

Step 2: Defining DAG

Now we’ll create a DAG object and pass the dag_id, which is the name of the DAG. Make sure you haven’t already established a DAG with this name.

Add a description and schedule interval to the previously created input, and the DAG will execute after the specified time interval.

DAG
Image Source: Self

Step 3: Defining DAG Arguments

For each of the DAG, we must pass one argument dictionary. Here are some of the arguments that you can pass:

  • owner: the name of the workflow owner should be alphanumeric. It can have underscores but not spaces.
  • email: if a task fails for whatever reason, you’ll get an email.
  • start_date: start date of your workflow.
  • depends_on_past: if you run your workflow, the data depends upon the past run, then mark it as True otherwise, mark it as False.
  • retry_delay: time to wait to retry a task if any task fails.

Step 4: Defining the Python Function

We’ll now construct the Python function that will print a string with an argument, which will be utilized by the PythonOperator Airflow later.

def my_function(x):
    return x + " This is a Python function."

Step 5: Defining the Task

The task_id is passed to the PythonOperator object. Pass the name of the Python function to the python_callable and the arguments using op_kwargs parameter as dictionary and lastly, the DAG object. 

Defining the Task
Image Source: Self

Step 6: Run the DAG

Once the Airflow dashboard is refreshed, a new DAG will appear. After you click the DAG, it will begin to execute and colors will indicate the current status of the workflow. Run the workflow and wait for the dark green border to appear, indicating the task has been completed successfully.

Running the DAG
Image Credit

Step 7: Templating

The PythonOperator is an exception to the templating. It accepts a python_callable argument in which the runtime context may be applied, rather than the arguments that can be templated with the runtime context.

Different Python Operators in Airflow

In this section, you will go through various Python Operators in Airflow that are widely used in creating, managing, and accessing the workflows. The following Python Operators in Airflow are listed below:

1) Python Operator: airflow.operators.python.PythonOperator

Python Operator: airflow.operators.python.PythonOperator
Image Source: Self

When the callable is running, the Airflow passes a set of arguments that can be used in the function. This set of kwargs corresponds to the jinja templates. 

Parameters

  • python_callable (python callable) – a reference to an object.
  • op_kwargs – a dictionary of keyword arguments that will unpack in the function. 
  • op_args – it is a list of positional arguments that will be unpacked when calling the callable.
  • templates_dict (dict[str]) – a dictionary with values (templates) that will be templated by the Airflow engine between __init__ and execute.
  • templates_exts  – a list of file extensions to resolve when processing with templated fields, such as [‘.sql’, ‘.hql’].

2) Python Operator: airflow.models.python.task

Python Operator: airflow.models.python.task
Image Source: Self

It is a deprecated function that calls @task.python and allows users to turn a python function into an Airflow task.

Alternative:

from airflow.decorators import task
@task def my_task()

3) Python Operator: airflow.operators.python.BranchPythonOperator

Python Operator: airflow.operators.python.BranchPythonOperator
Image Source: Self

Allows a workflow to “branch” or accepts to follow a path following the execution of this task.

4) Python Operator: airflow.operators.python.ShortCircuitOperator

Python Operator: airflow.operators.python.ShortCircuitOperator
Image Source: Self

It allows a workflow to continue only if a condition is true. Else, the workflow “short-circuits” and the tasks are skipped.

5) Python Operator: airflow.operators.python.PythonVirtualenvOperator

Python Operator: airflow.operators.python.PythonVirtualenvOperator
Image Source: Self

Allows users to run a function in a virtualenv that can be created and destroyed automatically. The function here must be defined using def and not as part of a class.

Parameters

  • requirements – a list of requirements specified in a pip install command.
  • use_dill (bool) – use dill to serialize the args; this allows complex types but requires users to include dill in the requirements.
  • system_site_packages (bool) – can decide whether to include system_site_packages in the virtualenv.
  • string_args (list[str]) – strings present in the global var virtualenv_string_args are available for python_callable at runtime as a list[str]. 

6) Python Operator: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator

Python Operator: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator
Image Source: Self

Conclusion 

In this article, you learnt about different Python Operators, their syntax, along the parameters. Overall, this blog is a complete walk-through guide on Python Operators in Airflow. With the strong foundation of the Python framework, Apache Airflow enables users to effortlessly schedule and run any complex Data Pipelines at regular intervals. Data Pipelines represented as DAG play an essential role in the Airflow to create flexible workflows.

The rich web interface of Airflow provides an easy view to monitor the results of the pipeline runs and debug any failures if occurred. Today, because of the dynamic nature and the flexibility that Apache Airflow brings to the table, many companies have benefited from it.

Visit our Website to Explore Hevo

Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 100+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.

Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.

Share your experience of learning about Python Operator in Airflow in the comments section below!

mm
Freelance Technical Content Writer, Hevo Data

Shravani is a data science enthusiast who loves to delve deeper into complex topics on data science and solve the problems related to data integration and analysis through comprehensive content for data practitioners and businesses.

No-code Data Pipeline For your Data Warehouse