Airflow is a workflow management tool that helps to represent data engineering pipelines as Python code. Airflow represents workflows as Directed Acyclic Graphs or DAGs. It provides an intuitive user interface that enables users to configure the workflow tasks, schedule and monitor them.
Airflow is generally used in fetching data from various sources, transforming them, and then pushing them to different destinations. To do this, Airflow provides a number of hooks and operators to fetch and transform data. This post talks about the Airflow S3 Hook provided by Airflow and how to use it.
Prerequisites
To successfully set up the Airflow S3 Hook, you need to meet the following requirements:
- Python 3.6 or above.
- Airflow installed and configured to use.
- An AWS account with S3 permission.
Understanding Airflow Hooks
The purpose of Airflow Hooks is to facilitate integration with external systems. Hooks help Airflow users to connect to different data sources and targets. It is used in fetching data as well as pushing data. Airflow Hooks are used as the building block for implementing Airflow operators. Airflow operators abstract full functionality of extracting, transforming, and loading data for various source and destination combinations.
Hooks help users to avoid boilerplate code that they would have to implement otherwise to connect to various systems. Airflow provides a number of built hooks to interface with systems like MySQL, PostgreSQL, S3, etc. Airflow also provides an interface for developing custom hooks, in case the built-in hooks are not enough for you.
Airflow S3 Hook provides methods to retrieve keys, buckets, check for the presence of keys, list all keys, load a file to S3, download a file from S3, etc. You can find a complete list of all functionalities supported by the S3 Hook here.
As you learn about Airflow, it’s important to know about the best platforms for data integration as well. Hevo Data, a No-code Data Pipeline platform, helps to replicate data from any data source such as Databases, SaaS applications, Cloud Storage, SDKs, and Streaming Services, and simplifies the ETL process.
It supports 150+ data sources (including 60+ free data sources) like Asana and is an easy 3-step process. With Hevo’s transformation feature, you can modify the data and make it into analysis-ready form.
Check out some of the cool features of Hevo:
- Live Monitoring: Track data flow and status in real-time.
- Completely Automated: Set up in minutes with minimal maintenance.
- Real-Time Data Transfer: Get analysis-ready data with zero delays.
- 24/5 Live Support: Round-the-clock support via chat, email, and calls.
- Schema Management: Automatic schema detection and mapping.
Get Started with Hevo for Free
Steps to Set Up Airflow S3 Hook
The goal of this post is to help the reader get familiarized with the concept of Airflow Hooks and to build his first DAG using the Airflow S3 Hook. You will cover the following points in this article:
- Work with Airflow UI
- Configure the Airflow S3 Hook and its connection parameters;
- Use Airflow S3 Hook to implement a DAG.
Follow the steps below to get started with Airflow S3 Hook:
Step 1: Set up Airflow S3 Hook
Once installed the Airflow S3 Hook, you can use the below command to start the Airflow Webserver:
airflow webserver -p 8080
Then, access localhost:8080 in your favorite browser to view the Airflow UI.
Step 2: Set Up the Airflow S3 Hook Connection
Click the Admin tab in the Airflow user interface and click Connections as shown below:
Set up the S3 connection object by clicking the ‘+’ button. Add a relevant name, and ensure you select Connection Type as S3. You will have to use the name of the connection in your code. So please keep a note of the name that was entered here.
In the Extra section, add your AWS secret credentials in the below format. Leave the rest of the fields back.
{"aws_access_key_id":"_your_aws_access_key_id_", "aws_secret_access_key": "_your_aws_secret_access_key_"}
Step 3: Implement the DAG
Use the below code snippet to implement the DAG.
import logging
from airflow import DAG
from datetime import datetime, timedelta
from airflow.providers.postgres.hooks.postgres import PostgresHook
from airflow.operators.python import PythonOperator
from tempfile import NamedTemporaryFile
from typing import TYPE_CHECKING, Dict, List, Optional, Sequence, Union
from airflow.exceptions import AirflowException
from airflow.models import BaseOperator
from airflow.providers.amazon.aws.hooks.s3 import S3Hook
# Change these to your identifiers, if needed.
AWS_S3_CONN_ID = "S3_default"
def s3_extract():
source_s3_key = "YOUR_S3_KEY"
source_s3_bucket = "YOUR_S3_BUCKET"
dest_file_path = "home/user/airflow/data/s3_extract.txt"
source_s3 = S3Hook(AWS_S3_CONN_ID)
source_s3.download(source_s3_key,source_s3_bucket,dest_file_path)
with DAG(
dag_id="s3_extract",
start_date=datetime(2022, 2, 12),
schedule_interval=timedelta(days=1),
catchup=False,
) as dag:
t1 = PythonOperator(
task_id="s3_extract_task",
python_callable=s3_extract)
t1
The first part of the DAG includes the import statements required. The core part of the DAG is the s3_extract function. This function uses the Airflow S3 Hook to initialize a connection to AWS. It then gets the file using the key and bucket name. The file is then downloaded using the download_fileobj method provided by the boto3 S3 client.
The DAG is configured to run this extract every day starting a specific date.
Now save the DAG as s3_extract_dag.py in the DAG directory. The DAG directory is specified as the dags_folder parameter in the airflow.cfg file that is located in your installation directory.
Step 4: Run the DAG
After saving the file in the DAG directory, execute the below command to ensure that file has been indexed by Airflow. This is required for Airflow to recognize the file as a DAG.
airflow db init
Go to the Airflow UI and find the DAG.
You can then trigger the DAG using the ‘play’ button in the top right corner.
Once it is successfully executed, head to the directory configured for saving the file and you will be able to find the output CSV file.
That concludes your effort to use the Airflow S3 Hook to download a file. See how easy it was to download a file from S3 because S3 Hook from Airflow abstracted away all the boilerplate code and provided a simple function we can call to download the file.
Challenges faced with Airflow S3 Hooks
While the above Airflow S3 Hook connection may appear easy, this is not a reflection of the actual requirement that ETL Developers face in production. Let us now learn about some of the typical challenges faced while executing such a requirement in production.
- The above method works well for a single one-off extract. S3 is used as a data lake in many ETL pipelines and file download may not be about just fetching and saving in the system. There will be additional complexities in the form of recognizing when a file arrived and then acting on it, checking for duplicate files, etc. In such cases, complex logic will have to be implemented in the form of airflow operators.
- While Airflow provides a set of built-in operators and hooks, they are not sufficient more often than not, especially for organizations that use many SAAS offerings.
- The DAG definition still has to be done based on code. There are ETL tools like Hevo that can help avoid coding altogether and let you focus only on the business logic.
Integrate your data in minutes!
No credit card required
Conclusion
Airflow S3 Hooks provide an excellent way of abstracting all the complexities involved in interacting with S3 from Airflow. It also provides a clean way of configuring credentials outside the code through the use of connection configuration. The intuitive user interface helps to configure the periodic runs and monitor them. That said, it still needs you to write code since DAG definitions have to be written as code.
However, extracting complex data from a diverse set of data sources like CRMs, Project management Tools, Streaming Services, Marketing Platforms can be quite challenging. If you are looking for a no-code way of extracting data from S3 and transforming them, checkout Hevo. Try a 14-day free trial and experience the feature-rich Hevo suite firsthand. Also, checkout our unbeatable pricing to choose the best plan for your organization.
FAQ on Setting Up Airflow S3 Hook
1. How do I connect my S3 to airflow?
– Install Airflow and AWS Dependencies
– Set Up AWS Credentials
– Configure Airflow Connection
– Use the Connection in Your DAG
2. How does airflow S3 sensor work?
The S3KeySensor in Airflow is used to wait for a specific file to appear in an S3 bucket before proceeding with the next task in a DAG
3. What is a hook in airflow?
In Airflow, a hook is an interface to external systems or services, like databases, cloud storage, or APIs. Hooks provide methods to interact with these systems and are used by operators to perform actions.
4. How do you install airflow hooks?
Airflow hooks are typically part of the provider packages.We can install airflow hooks by installing the ‘apache-airflow-providers-amazon’ package
5. How do you connect to S3?
To connect to S3, we need to use the S3Hook provided by Airflow
Talha is a Software Developer with over eight years of experience in the field. He is currently driving advancements in data integration at Hevo Data, where he has been instrumental in shaping a cutting-edge data integration platform for the past four years. Prior to this, he spent 4 years at Flipkart, where he played a key role in projects related to their data integration capabilities. Talha loves to explain complex information related to data engineering to his peers through writing. He has written many blogs related to data integration, data management aspects, and key challenges data practitioners face.