Managing huge volumes of business data transactions and scalable applications is essential for companies. Business applications continuously send and receive many requests to multiple spots and it is the responsibility of Developers to effectively handle all the requests and make sure enough resources are available for every request. Building scalable applications along with managing data requires scheduled events and Data Pipelines for efficient workflow.

Apache Airflow is a workflow management platform that allows companies to programmatically stage their Data Pipeline tasks. Airflow allows Developers to handle workflows and execute certain events until a defined condition is met. Airflow uses its special operators such as S3KeySensor to manage and configure these events.

The S3KeySensor checks for the key in the S3 bucket at regular intervals and performs defined tasks. In this article, you will learn about Airflow S3KeySensor, understand how to define it with the help of its syntax, and implement S3KeySensor using a simple Python code.

Table of Contents

Prerequisites

This is what you need for this article:

  • Python installed on your local machine
  • Brief knowledge of Python. 
  • Apache Airflow installed on your local machine.

What is Apache Airflow?

Apache Airflow Logo
Image Source

Apache Airflow is a popular platform for workflow management. It helps its users to author, schedule, and monitor their workflows programmatically. When users define their workflows in the form of code, they become more versionable, maintainable, testable, and collaborative. Thus, Airflow makes it easy to manage workflows. 

With Airflow, you can author your workflows in the form of directed acyclic graphs (DAGs). This makes it easy to execute tasks in the correct order and allocate the right resources to them. 

Airflow Sensors are very important to DAGs. They check for the occurrence of particular conditions before they can execute tasks in the DAGs. This means that they are good at making DAGs more event-driven. There are different types of Airflow Sensors and they all perform different tasks.

Key Features of Apache Airflow

Some of the main features of Apache Airflow are given below.

  • Easy Integrations: Airflow comes with any operators that allow users to easily integrate it with many applications and cloud platforms such as Google, AWS, Azure, etc, for developing scalable applications. 
  • Supports Python: Python is easy to learn and code that is widely used in the industry for developing modern applications. Airflow uses standard python language for creating DAGs and integrating with other platforms for deployment.
  • Open Source: Airflow is free to use and has a large community of active users that makes it easier for developers to availability of resources.
  • Graphical UI: Airflow comes with a web application to monitor the real-time status of running tasks and DAGs. 

To know more about Apache Airflow, click here.

Simplify Data Analysis with Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline helps to load data from any data source such as Databases, SaaS applications, Cloud Storage, SDK,s, and Streaming Services and simplifies the ETL process. It supports 100+ data sources (including 30+ free data sources) and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo not only loads the data onto the desired Data Warehouse/destination but also enriches the data and transforms it into an analysis-ready form without having to write a single line of code.

Get Started with Hevo for Free

Its completely automated pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensures that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different BI tools as well.

Check out why Hevo is the Best:

  1. Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  2. Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  3. Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  4. Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  5. Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  6. Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, E-Mail, and support calls.
  7. Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

What is Airflow S3KeySensor?

Airflow S3KeySensor
Image Source

Apache Airflow Sensors are special operators designed to wait for certain events or conditions to happen. They check for the occurrence of these events at particular intervals. If it finds that the condition has been met, the condition is marked as successful and the DAG moves to the downstream tasks. If the condition is not yet met, the sensor waits for some time before checking whether it has been met. All sensors inherit from the BaseSensorOperator

The S3KeySensor, just as its name suggests, checks for the availability of keys (also known as files) added to an S3 Bucket. You can set the sensor to check for this after seconds or minutes.

When the DAG is running, the sensor will be checking whether the key is available or not. If the key is found, control will shift to the next task within the DAG. If the key is not found, it will retry or fail based on the configuration. 

For you to be able to enable an S3KeySensor, Airflow should be granted access to S3. This means there should be an active S3 connection. 

S3KeySensor Syntax

The S3KeySensor is used with the following syntax:

classairflow.sensors.s3_key_sensor.S3KeySensor(bucket_key, bucket_name=None, wildcard_match=False, aws_conn_id=’aws_default’, verify=None, *args, **kwargs)

The above parameters are described below:

  • bucket_key (str): The key or file being waited on. You can use full s3:// style URL or specify a relative path from the root level.
  • bucket_name (str): The name of the S3 bucket where the file is stored. 
  • wildcard_match (bool): Specifies whether to interpret the bucket_key as a Unix wildcard pattern. 
  • aws_conn_id (str): This refers to the s3 connection. 
  • verify (bool or str): Whether to verify the SSL certificates for S3 connection or not. By default, it performs SSL monitoring to verify the SSL certificate. The following values can be provided: False: SSL certificates will not be validated. It will still use SSL (except when use_ssl is False), but it will not verify SSL certificates. path/to/cert/bundle.pem: A filename for the CA cert bundle to be used.

This argument can be specified when you want to use a different CA cert bundle other than the one used by botocore

Implementing Airflow S3KeySensor 

We want to run the DAG once a file lands in the S3 Bucket. The DAG wants to make some complex steps but the client is not interested in using AWS lambdas.

The DAG will check whether the file has arrived or not after every 1 minute and it will timeout after 180 seconds. If the file is found, it will run the specified Python function. 

The DAG given below uses an S3KeySensor to achieve the above:

from airflow.models import DAG
from datetime import datetime, timedelta
from airflow.operators.python_operator import PythonOperator
from airflow.operators.sensors import S3KeySensor
import boto3

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2022, 2, 22),
    'email': ['nic@enye.tech'],
    'email_on_failure': False,
    'max_active_runs': 1,
    'email_on_retry': False,
    'retry_delay': timedelta(minutes=5)
}

dg = DAG('cloudwalker_s3_sensor',
          schedule_interval='@daily',
          default_args=default_args,
          catchup=False
          )
s3_buckname = 'demo1-s3-sensor'
s3_locat = 'demo/testfile.txt'

s3_sensor = S3KeySensor(
    task_id='s3_file_check',
    poke_interval=60,
    timeout=180,
    soft_fail=False,
    retries=2,
    bucket_key=s3_locat,
    bucket_name=s3_buckname,
    aws_conn_id='customer_demo',
    dag=dg)


def processing_func(**kwargs):
    print("Reading the file")
    s3 = boto3.client('s3')
    obj = s3.get_object(Bucket=s3_buckname, Key=s3_locat)
    lin = obj['Body'].read().decode("utf-8")
    print(lin)


func_task = PythonOperator(
    task_id='a_task_using_found_file',
    python_callable=processing_func,
    dag=dg)

s3_sensor >> func_task

The above code implements a DAG to perform a simple task. It checks for the presence of a file in the S3 Bucket. If the file is found, the DAG will run a specified Python function. 

First, you have to start by importing all the libraries that you will need to implement the S3KeySensor. Then add the default arguments to be used in the code. You have to define the DAG and give it the name “dg”. The S3KeySensor will be checking for the availability of a file named testfile.txt in the specified S3 bucket name and location.

Airflow will do 2 retries and each retry will last for 180 seconds. In every attempt, the poke will be done after every 60 seconds and there will be a maximum of 4 pokes in every attempt. If the file is finally not found, it will fail. 

Then you have to create a function named processing_func which is the function to be run if the file is found. 

Conclusion

In this article, you learnt that Apache Airflow is a workflow management platform. It helps its users to create, schedule, and monitor workflows by writing code, making it easy for the users to manage their workflows. Apache Airflow uses DAGs so as to execute tasks in the correct order and allocate the right resources on time. Airflow sensors make DAGs more event-driven by checking for the occurrence of certain conditions in order to execute tasks. 

There are different types of Airflow sensors, with the S3KeySensor being one of them. It checks for the presence of a key or a file in an S3 bucket. The sensor can be configured to be checking for the presence of the key after seconds or minutes. If it finds the key, control is passed to the next task within the DAG. If not, it retries or fails based on the configuration. 

Visit our Website to Explore Hevo

Companies need to analyze their business data stored in multiple data sources. The data needs to be loaded to the Data Warehouse to get a holistic view of the data. Hevo Data is a No-code Data Pipeline solution that helps to transfer data from 100+ sources to desired Data Warehouse. It fully automates the process of transforming and transferring data to a destination without writing a single line of code.

Want to take Hevo for a spin? Sign Up here for a 14-day free trial and experience the feature-rich Hevo suite first hand.

Share your experience of learning about Airflow S3KeySensor in the comments section below!

Nicholas Samuel
Freelance Technical Content Writer, Hevo Data

Skilled in freelance writing within the data industry, Nicholas is passionate about unraveling the complexities of data integration and data analysis through informative content for those delving deeper into these subjects. He has written more than 150+ blogs on databases, processes, and tutorials that help data practitioners solve their day-to-day problems.

No-code Data Pipeline For your Data Warehouse

Get Started with Hevo