Managing huge volumes of business data transactions and scalable applications is essential for companies. Business applications continuously send and receive many requests to multiple spots and it is the responsibility of Developers to effectively handle all the requests and make sure enough resources are available for every request. Building scalable applications along with managing data requires scheduled events and Data Pipelines for efficient workflow.
Apache Airflow is a workflow management platform that allows companies to programmatically stage their Data Pipeline tasks. Airflow allows Developers to handle workflows and execute certain events until a defined condition is met. Airflow uses its special operators such as S3KeySensor to manage and configure these events.
The S3KeySensor checks for the key in the S3 bucket at regular intervals and performs defined tasks. In this article, you will learn about Airflow S3KeySensor, understand how to define it with the help of its syntax, and implement S3KeySensor using a simple Python code.
Prerequisites
This is what you need for this article:
- Python installed on your local machine
- Brief knowledge of Python.
- Apache Airflow installed on your local machine.
What is Apache Airflow?
Apache Airflow is a popular platform for workflow management. It helps its users to author, schedule, and monitor their workflows programmatically. When users define their workflows in the form of code, they become more versionable, maintainable, testable, and collaborative. Thus, Airflow makes it easy to manage workflows.
With Airflow, you can author your workflows in the form of directed acyclic graphs (DAGs). This makes it easy to execute tasks in the correct order and allocate the right resources to them.
Airflow Sensors are very important to DAGs. They check for the occurrence of particular conditions before they can execute tasks in the DAGs. This means that they are good at making DAGs more event-driven. There are different types of Airflow Sensors and they all perform different tasks.
To learn more about building robust data pipelines with Airflow, check out Hevo’s resources.
Key Features of Apache Airflow
Some of the main features of Apache Airflow are given below.
- Easy Integrations: Airflow comes with any operators that allow users to easily integrate it with many applications and cloud platforms such as Google, AWS, Azure, etc, for developing scalable applications.
- Supports Python: Python is easy to learn and code that is widely used in the industry for developing modern applications. Airflow uses standard python language for creating DAGs and integrating with other platforms for deployment.
- Open Source: Airflow is free to use and has a large community of active users that makes it easier for developers to availability of resources.
- Graphical UI: Airflow comes with a web application to monitor the real-time status of running tasks and DAGs.
To know more about Apache Airflow, check out its official site.
Hevo Data, a No-code Data Pipeline helps to load data from any data source such as Databases, SaaS applications, Cloud Storage, SDK,s, and Streaming Services and simplifies the ETL process. It supports 150+ data sources (including 60+ free data sources) and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo not only loads the data onto the desired Data Warehouse/destination but also enriches the data and transforms it into an analysis-ready form without having to write a single line of code.
Check out why Hevo is the Best:
- Secure & Reliable: Hevo’s fault-tolerant architecture ensures secure, consistent data handling with zero loss and automatic schema management.
- User-Friendly & Scalable: Hevo’s simple UI makes it easy for new users, while its horizontal scaling manages growing data volumes with minimal latency.
- Efficient Data Transfer: Hevo supports real-time, incremental data loads, optimizing bandwidth usage for both ends.
- Live Monitoring & Support: Hevo provides live data flow monitoring and 24/5 customer support via chat, email, and calls.
Sign up here for a 14-Day Free Trial!
What is Airflow S3KeySensor?
Apache Airflow Sensors are special operators designed to wait for certain events or conditions to happen. They check for the occurrence of these events at particular intervals. If it finds that the condition has been met, the condition is marked as successful and the DAG moves to the downstream tasks. If the condition is not yet met, the sensor waits for some time before checking whether it has been met. All sensors inherit from the BaseSensorOperator.
The S3KeySensor, just as its name suggests, checks for the availability of keys (also known as files) added to an S3 Bucket. You can set the sensor to check for this after seconds or minutes.
When the DAG is running, the sensor will be checking whether the key is available or not. If the key is found, control will shift to the next task within the DAG. If the key is not found, it will retry or fail based on the configuration.
For you to be able to enable an S3KeySensor, Airflow should be granted access to S3. This means there should be an active S3 connection.
S3KeySensor Syntax
The S3KeySensor is used with the following syntax:
classairflow.sensors.s3_key_sensor.S3KeySensor(bucket_key, bucket_name=None, wildcard_match=False, aws_conn_id=’aws_default’, verify=None, *args, **kwargs)
The above parameters are described below:
- bucket_key (str): The key or file being waited on. You can use full s3:// style URL or specify a relative path from the root level.
- bucket_name (str): The name of the S3 bucket where the file is stored.
- wildcard_match (bool): Specifies whether to interpret the bucket_key as a Unix wildcard pattern.
- aws_conn_id (str): This refers to the s3 connection.
- verify (bool or str): Whether to verify the SSL certificates for S3 connection or not. By default, it performs SSL monitoring to verify the SSL certificate. The following values can be provided: False: SSL certificates will not be validated. It will still use SSL (except when use_ssl is False), but it will not verify SSL certificates. path/to/cert/bundle.pem: A filename for the CA cert bundle to be used.
This argument can be specified when you want to use a different CA cert bundle other than the one used by botocore.
Implementing Airflow S3KeySensor
We want to run the DAG once a file lands in the S3 Bucket. The DAG wants to make some complex steps but the client is not interested in using AWS lambdas.
The DAG will check whether the file has arrived or not after every 1 minute and it will timeout after 180 seconds. If the file is found, it will run the specified Python function.
The DAG given below uses an S3KeySensor to achieve the above:
from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import S3KeySensor
import boto3
default_args = {
'owner': 'airflow',
'depends_on_past': False,
'start_date': datetime(2022, 2, 22),
'email': ['nic@enye.tech'],
'email_on_failure': False,
'max_active_runs': 1,
'email_on_retry': False,
'retry_delay': timedelta(minutes=5)
}
dg = DAG('cloudwalker_s3_sensor',
schedule_interval='@daily',
default_args=default_args,
catchup=False
)
s3_buckname = 'demo1-s3-sensor'
s3_locat = 'demo/testfile.txt'
s3_sensor = S3KeySensor(
task_id='s3_file_check',
poke_interval=60,
timeout=180,
soft_fail=False,
retries=2,
bucket_key=s3_locat,
bucket_name=s3_buckname,
aws_conn_id='customer_demo',
dag=dg)
def processing_func(**kwargs):
print("Reading the file")
s3 = boto3.client('s3')
obj = s3.get_object(Bucket=s3_buckname, Key=s3_locat)
lin = obj['Body'].read().decode("utf-8")
print(lin)
func_task = PythonOperator(
task_id='a_task_using_found_file',
python_callable=processing_func,
dag=dg)
s3_sensor >> func_task
The above code implements a DAG to perform a simple task. It checks for the presence of a file in the S3 Bucket. If the file is found, the DAG will run a specified Python function.
First, you have to start by importing all the libraries that you will need to implement the S3KeySensor. Then add the default arguments to be used in the code. You have to define the DAG and give it the name “dg”. The S3KeySensor will be checking for the availability of a file named testfile.txt in the specified S3 bucket name and location.
Airflow will do 2 retries and each retry will last for 180 seconds. In every attempt, the poke will be done after every 60 seconds and there will be a maximum of 4 pokes in every attempt. If the file is finally not found, it will fail.
Then you have to create a function named processing_func which is the function to be run if the file is found.
Conclusion
In this article, you learnt that Apache Airflow is a workflow management platform. It helps its users to create, schedule, and monitor workflows by writing code, making it easy for the users to manage their workflows. Apache Airflow uses DAGs so as to execute tasks in the correct order and allocate the right resources on time. Airflow sensors make DAGs more event-driven by checking for the occurrence of certain conditions in order to execute tasks.
There are different types of Airflow sensors, with the S3KeySensor being one of them. It checks for the presence of a key or a file in an S3 bucket. The sensor can be configured to be checking for the presence of the key after seconds or minutes. If it finds the key, control is passed to the next task within the DAG. If not, it retries or fails based on the configuration.
Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 150+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.
Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.
Share your experience of learning about Airflow S3KeySensor in the comments section below!
FAQs
1. What is the use of S3KeySensor?
The S3KeySensor in Apache Airflow is used to monitor and detect the presence of a specific file in an S3 bucket. It allows workflows to wait until the file is available before proceeding with downstream tasks, ensuring the required data is present before execution.
2. What is an S3 sensor?
An S3 sensor in Apache Airflow is a type of sensor used to monitor Amazon S3 for specific events or conditions, such as the existence of a file or object in a bucket. It helps automate workflows by waiting for the presence or change of data in S3 before triggering downstream tasks.
3. What is the difference between poke and reschedule airflow sensor?
In Apache Airflow, a poke sensor checks a condition repeatedly at fixed intervals and holds the task in a “running” state until the condition is met. A reschedule sensor, on the other hand, periodically checks the condition but releases the task back to the scheduler when it doesn’t meet the condition, allowing it to be rescheduled later, which is more resource-efficient.
4. What is S3 in airflow?
In Apache Airflow, S3 refers to integration with Amazon S3 (Simple Storage Service), enabling workflows to interact with S3 buckets. Airflow provides operators like S3FileTransferOperator and sensors like S3KeySensor to upload, download, and monitor files stored in S3, making it easier to automate data transfers in cloud-based workflows.
Nicholas Samuel is a technical writing specialist with a passion for data, having more than 14+ years of experience in the field. With his skills in data analysis, data visualization, and business intelligence, he has delivered over 200 blogs. In his early years as a systems software developer at Airtel Kenya, he developed applications, using Java, Android platform, and web applications with PHP. He also performed Oracle database backups, recovery operations, and performance tuning. Nicholas was also involved in projects that demanded in-depth knowledge of Unix system administration, specifically with HP-UX servers. Through his writing, he intends to share the hands-on experience he gained to make the lives of data practitioners better.