Apache Airflow is an open-source workflow management platform for building Data Pipelines. It enables users to schedule and run Data Pipelines using the flexible Python Operators and framework. The ability to implement the pipelines allows users to streamline various business processes. Besides, the Python foundation makes extending and adding integrations with many different systems easier.
As a result, Airflow is currently used by many data-driven organizations to orchestrate a variety of crucial data activities. This article will guide you through how to install Apache Airflow in the Python environment to understand different Python Operators used in Airflow.
Prerequisites
- Fundamental knowledge of Data Pipelines.
Introduction to Apache Airflow
Apache Airflow is an open-source, batch-oriented, pipeline-building framework for developing and monitoring data workflows. In 2014, Airbnb developed Airflow to solve big data and complex Data Pipeline problems. They used a built-in web interface to write and schedule processes and monitor workflow execution. The increasing success of the Airflow project led to its adoption in the Apache Software Foundation.
Airflow enables users to efficiently build scheduled Data Pipelines utilizing some standard features of the Python framework, such as data time format for scheduling tasks. It also provides numerous building blocks that allow users to stitch together the many technologies present in today’s technological landscapes.
Another key feature of Airflow is the backfilling property; it enables users to reprocess previous data easily. This feature also allows users to recompute any dataset after modifying the code. As a real-world example, Airflow can be compared to a spider in a web: it resides in the center of your data processes, coordinating work across several distributed systems.
Key Features of Apache Airflow
- Extensible: Easily defines operators and extends libraries to fit the level of abstraction that suits your requirements.
- Dynamic: Airflow pipelines are configured as code (Python), allowing for dynamic pipeline generation. Moreover, it enables users to restart from the point of failure without restarting the entire workflow again.
- Elegant: Airflow pipelines are explicit and straightforward. The powerful Jinja templating engine is incorporated into the core of Airflow, allowing you to parameterize your scripts. Besides, the rich scheduling semantics enables users to run pipelines at regular intervals.
- Scalable: Airflow is a modular design that orchestrates an arbitrary number of workers via a message queue. In itself, Airflow is a general-purpose orchestration framework with a manageable set of features to learn.
To know more about Apache Airflow, click here.
Streamline Your Workflows and Enhance Data Integration by leveraging Hevo Data’s capability to perform Python transformations during ETL. Simplify complex data processing tasks and customize your ETL pipelines effortlessly with Python scripting.
Start Your Data Transformations Today!
Introducing Data Pipelines as a Graph
In Airflow, a DAG or a Directed Acyclic Graph is a collection of all the tasks that the users want to run is organized in such a way that the relationships and dependencies are reflected. The structure of a DAG (tasks and their dependencies) is represented as code in a Python script.
Drawing the Data Pipeline as a graph is one method to make task relationships more apparent. Tasks are nodes in the graph, whereas directed edges represent dependencies between tasks. The direction of the edge denotes the dependency.
For example, an edge pointing from task A to task B implies that task A must be finished before task B can begin. Due to these directions in the graph edges, it is referred to as a directed graph.
Getting started with Airflow in Python Environment
Now that you have understood about Apache Airflow. In this section, you will learn how to get started with Apache Airflow in Python Environment and later in the article, you will learn more about using Airflow Python Operators. The following steps to set up Airflow with Python are listed below:
Step 1: Installing Airflow
- Install the apache airflow using the pip with the following command.
pip install apache-airflow
- After installing Airflow, start it by initializing the metadatabase (a database where all Airflow state is stored).
airflow db init
- Run the command in the terminal to start the webserver.
airflow webserver
- Now, start the airflow scheduler.
airflow scheduler
Step 2: Inspecting the Airflow UI
- Because the scheduler and webserver are both continuous processes that keep your terminal open, you can either run them in the background with airflow webserver or run them individually in a second terminal window.
- To access Airflow, browse to http://localhost:8080 and log in with the username “admin” and password “admin.”
- A login screen will appear as shown below.
Introducing Python Operators in Apache Airflow
Now the setup is ready to use Airflow with Python on your local machine. In this section, you will go through steps to use Python Operators and Run a task after creating a DAG. Once this section gets completed, you will understand Python Operators and how to create a DAG and run a task using Airflow and Python. The steps to use Python Operators Airflow are listed below.
Step 1: Importing the Libraries
Import all necessary libraries.
Step 2: Defining DAG
Now we’ll create a DAG object and pass the dag_id, which is the name of the DAG. Make sure you haven’t already established a DAG with this name.
Add a description and schedule interval to the previously created input, and the DAG will execute after the specified time interval.
Step 3: Defining DAG Arguments
For each of the DAG, we must pass one argument dictionary. Here are some of the arguments that you can pass:
- owner: the name of the workflow owner should be alphanumeric. It can have underscores but not spaces.
- email: if a task fails for whatever reason, you’ll get an email.
- start_date: start date of your workflow.
- depends_on_past: if you run your workflow, the data depends upon the past run, then mark it as True otherwise, mark it as False.
- retry_delay: time to wait to retry a task if any task fails.
Step 4: Defining the Python Function
We’ll now construct the Python function that will print a string with an argument, which will be utilized by the PythonOperator Airflow later.
def my_function(x):
return x + " This is a Python function."
Step 5: Defining the Task
The task_id is passed to the PythonOperator object. Pass the name of the Python function to the python_callable and the arguments using op_kwargs parameter as dictionary and lastly, the DAG object.
Step 6: Run the DAG
Once the Airflow dashboard is refreshed, a new DAG will appear. After you click the DAG, it will begin to execute and colors will indicate the current status of the workflow. Run the workflow and wait for the dark green border to appear, indicating the task has been completed successfully.
Step 7: Templating
The PythonOperator is an exception to the templating. It accepts a python_callable argument in which the runtime context may be applied, rather than the arguments that can be templated with the runtime context.
Load Data from MongoDB to BigQuery
Load Data From HubSpot to Snowflake
Different Python Operators in Airflow
In this section, you will go through various Python Operators in Airflow that are widely used in creating, managing, and accessing the workflows. The following Python Operators in Airflow are listed below:
1) Python Operator: airflow.operators.python.PythonOperator
When the callable is running, the Airflow passes a set of arguments that can be used in the function. This set of kwargs corresponds to the jinja templates.
Parameters
- python_callable (python callable) – a reference to an object.
- op_kwargs – a dictionary of keyword arguments that will unpack in the function.
- op_args – it is a list of positional arguments that will be unpacked when calling the callable.
- templates_dict (dict[str]) – a dictionary with values (templates) that will be templated by the Airflow engine between __init__ and execute.
- templates_exts – a list of file extensions to resolve when processing with templated fields, such as [‘.sql’, ‘.hql’].
2) Python Operator: airflow.models.python.task
It is a deprecated function that calls @task.python and allows users to turn a python function into an Airflow task.
Alternative:
from airflow.decorators import task
@task def my_task()
3) Python Operator: airflow.operators.python.BranchPythonOperator
Allows a workflow to “branch” or accepts to follow a path following the execution of this task.
4) Python Operator: airflow.operators.python.ShortCircuitOperator
It allows a workflow to continue only if a condition is true. Else, the workflow “short-circuits” and the tasks are skipped.
5) Python Operator: airflow.operators.python.PythonVirtualenvOperator
Allows users to run a function in a virtualenv that can be created and destroyed automatically. The function here must be defined using def and not as part of a class.
Parameters
- requirements – a list of requirements specified in a pip install command.
- use_dill (bool) – use dill to serialize the args; this allows complex types but requires users to include dill in the requirements.
- system_site_packages (bool) – can decide whether to include system_site_packages in the virtualenv.
- string_args (list[str]) – strings present in the global var virtualenv_string_args are available for python_callable at runtime as a list[str].
6) Python Operator: airflow.contrib.operators.dataflow_operator.DataFlowPythonOperator
Conclusion
In this article, you learnt about different Python Operators, their syntax, along the parameters. Overall, this blog is a complete walk-through guide on Python Operators in Airflow. With the strong foundation of the Python framework, Apache Airflow enables users to effortlessly schedule and run any complex Data Pipelines at regular intervals. Data Pipelines represented as DAG play an essential role in the Airflow to create flexible workflows.
The rich web interface of Airflow provides an easy view to monitor the results of the pipeline runs and debug any failures if occurred. Today, because of the dynamic nature and the flexibility that Apache Airflow brings to the table, many companies have benefited from it.
Visit our Website to Explore Hevo
Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 150+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.
Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.
Share your experience of learning about Python Operator in Airflow in the comments section below!
Shravani is a passionate data science enthusiast interested in exploring complex topics within the field. She excels in data integration and analysis, skillfully solving intricate problems and crafting comprehensive content tailored for data practitioners and businesses. Shravani’s analytical prowess and dedication to delivering high-quality, informative material make her a valuable asset in data science.