Airflow is a Task Automation application. It helps organizations in scheduling tasks so that they can be completed at the appropriate time.

This relieves employees of repeating tasks. Additionally, Apache Airflow plans and orchestrates data pipelines or workflows.

Managing and analyzing massive amounts of data can be difficult if not properly planned and organized. Most business operations are managed by various apps, services, and websites that generate valuable data.

Amazon S3 is one of the best places to store large amounts of structured or unstructured data. It is a popular storage service for storing any type of data.

This article will teach you how to Download Airflow Read File from S3. You will also gain a comprehensive understanding of how to take an Airflow Read File from S3. Continue reading!

What is Apache Airflow?

Apache Airflow is a batch-oriented pipeline framework for developing and monitoring data workflows. Airflow was founded in 2014 by Airbnb to address big data and complex Data Pipeline issues.

They use a built-in web interface to write and schedule processes as well as monitor workflow execution. The Apache Software Foundation has adopted the Airflow project due to its growing popularity.

Simplify ETL Using Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline, helps load data from any data source such as databases, SaaS applications, cloud storage, SDK, and streaming services and simplifies the ETL process. It supports 150+ data sources and loads the data onto the desired Data Warehouse, enriches the data, and transforms it into an analysis-ready form without writing a single line of code.

  • Auto-Schema Management: Correcting improper schema after the data is loaded into your warehouse is challenging. Hevo automatically maps source schema with destination warehouse so that you don’t face the pain of schema errors.
  • Customer Support: With Hevo you get more than just a platform, you get a partner for your pipelines. Discover peace with round the clock “Live Chat” within the platform. What’s more, you get 24×5 support even during the 14-day full-feature free trial.
  • Reliability at Scale: With Hevo, you get a world-class fault-tolerant architecture that scales with zero data loss and low latency. 

Explore Hevo’s features and discover why it is rated 4.3 on G2 and 4.7 on Software Advice for its seamless data integration. Try out the 14-day free trial today to experience hassle-free data integration.

Get Started with Hevo for Free

What are DAGs?

Directed Acyclic Graphs (DAGs) are used in Airflow to create workflows. DAGs are high-level diagrams that define the dependent and exclusive tasks that can be ordered and scheduled.

Here are the DAGs that read data from three sources independently. After that, we run a Spark job to join the data on a key and write the transformation output.

By defining a DAG, the scheduler can determine which tasks can be run immediately and which must wait for other tasks to finish. The Spark job must wait for the three “read” tasks before populating the data into S3.

NOTE:

You should have an S3 bucket set up and at least one file in it.

As an example, consider bds-airflow-bucket with a single post.JSON document:

Airflow is simple (yet restrictive) to install as a single package. Here is a typical file structure for our environment to add DAGs, set them up, and run them.


airflow                  # the root directory.
├── dags                 # root folder for all dags. files inside folders are not searched for dags.
│   ├── my_dag.py        # my dag (definitions of tasks/operators) including precedence.
│   └── ...
├── logs                 # logs for the various tasks that are run
│   └── my_dag           # DAG specific logs
│   │   ├── src1_s3      # folder for task specific logs (log files are created by date of run)
│   │   ├── src2_hdfs
│   │   ├── src3_s3
│   │   └── spark_task_etl
├── airflow.db           # SQLite database used by Airflow internally to track status of each DAG.
├── airflow.cfg          # global configuration for Airflow (this can be overriden by config inside the file.)
└── ...
  • The directory structure represents the layout of an Airflow project, with the root directory containing all essential files and folders.
  • The dags folder holds DAGs (Directed Acyclic Graphs) that define task workflows; my_dag.py is one such DAG file.
  • The logs folder stores task execution logs, organized by DAG and task, with specific folders for each task like src1_s3 and spark_task_etl.
  • The airflow.db is an internal SQLite database that tracks the status and history of all DAGs and tasks run by Airflow.
  • The airflow.cfg file contains global configurations for the Airflow instance, which can be customized.

Configuring Airflow Read File from S3

Here are the two steps on how to Download Airflow Read File from S3:

Before diving into them, have a look at the prerequisites first:

1. Importing Packages

# airflow related
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# other packages
from datetime import datetime
from datetime import timedelta

2. Define the DAG, Tasks, and Operators

Let’s define all of the tasks for our current workflow. We have three tasks that read data from their respective sources and store it in S3 and HDFS. They are defined as Python functions that will be called by our operators.

We can pass parameters to the function using **args and **kwargs from our operator. For example, the function source2 to hdfs accepts a named parameter config as well as two context parameters ds and **kwargs.

#Airflow Read File from S3
def source1_to_s3():
 # code that writes our data from source 1 to s3
def source2_to_hdfs(config, ds, **kwargs):
 # code that writes our data from source 2 to hdfs
 # ds: the date of run of the given task.
 # kwargs: keyword arguments containing context parameters for the run.
def source3_to_s3():
 # code that writes our data from source 3 to s3
  • The code defines three functions, each handling data transfer from different sources.
  • source1_to_s3() is responsible for writing data from source 1 to an S3 bucket.
  • source2_to_hdfs(config, ds, **kwargs) writes data from source 2 to HDFS, using ds (the task run date) and kwargs (additional context parameters).
  • source3_to_s3() is similar to source1_to_s3() but handles data transfer from source 3 to S3.
  • Each function encapsulates logic for reading from a source and writing to the respective storage system (S3 or HDFS).

We will define our basic setup by importing three classes: DAG, BashOperator, and PythonOperator. Also, configure default_args.

#Airflow Read File from S3
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 9, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}
  • The code defines default_args, a dictionary containing default settings for an Airflow DAG.
  • owner: Specifies the owner of the DAG, in this case, ‘airflow’.
  • start_date: Sets the start date for the DAG, starting from September 1, 2018.
  • schedule_interval: Runs the DAG on a daily schedule (@daily).
  • retries and retry_delay: Defines the number of retries (1) and the delay (5 seconds) between retries if a task fails.

This aids in configuring the DAG’s default configuration. This link contains more information on configuring default args and the additional parameters available.

Integrate Amazon Ads to PostgreSQL
Integrate BigCommerce to Redshift
Integrate Hive to Snowflake

Step 1: Adding the DAGs to the Airflow Scheduler

Make a new Python file in the /airflow/dags directory. The file is called s3 download.py. Start with the DAG boilerplate code and library imports. To communicate with the S3 bucket, you’ll need the S3Hook class:

Downloading a file is essentially the same as declaring a PythonOperator based task. It will invoke the download from s3() function, which takes three arguments:

  • key – a string containing the name/path of the file on S3. For example, posts.json will retrieve that file from the bucket’s root. Paths can also be specified, such as /data/posts/posts.json. Make certain that it corresponds to your case.
  • bucket_name – string, the name of the bucket from which you want to download the file.
  • local_path – a string indicating the location of the file to be saved. It’s important to note that this is a directory path, not a file path.
#Airflow Read File from S3
import os
...


def download_from_s3(key: str, bucket_name: str, local_path: str) -> str:
    hook = S3Hook('s3_conn')
    file_name = hook.download_file(key=key, bucket_name=bucket_name, local_path=local_path)
    return file_name
    
    
 with DAG(...) as dag:
    # Download a file
    task_download_from_s3 = PythonOperator(
        task_id='download_from_s3',
        python_callable=download_from_s3,
        op_kwargs={
            'key': 'posts.json',
            'bucket_name': 'bds-airflow-bucket',
            'local_path': '/Users/dradecic/airflow/data/'
        }
    )
#Airflow Read File from S3
  • The code imports the necessary Airflow modules and sets up a function to download a file from S3.
  • The download_from_s3 function uses the S3Hook to connect to S3 and download a file to a specified local path.
  • In the DAG context (with DAG(...) as dag), a PythonOperator is created to execute the download_from_s3 function.
  • The task downloads a file named posts.json from the S3 bucket bds-airflow-bucket and saves it to the local directory /Users/dradecic/airflow/data/.
  • The op_kwargs passes the required parameters (key, bucket_name, local_path) to the function during the task execution.

The issue is that S3Hook downloads a file to the local path folder and names it arbitrarily with no extension. If that is not what designers want, then we will declare another task that renames the file.

The same function first creates an instance of the S3Hook class and then connects to the previously established connection. The hook instance’s download_file() method is then called to download the file.

Step 2: Downloading Airflow Read File from S3

First, we’ll need to get the file from S3:

#Airflow Read File from S3

airflow tasks test s3_download download_from_s3

As you can see, Airflow saved the file from S3 to /Users/dradecic/airflow/data/airflow_tmp_0xrx7pyi, which is a completely random file name.

The second task is to rename it to S3 downloaded posts.json:

#Airflow Read File from S3

airflow tasks test s3_download rename_file

The task was completed successfully, so we should see the following file in the data folder

Before leaving this spot, here’s the final view of the entire code!

{
'email_on_failure': False,
'email_on_retry': False,
'schedule_interval': '@daily',
'retries': 1,
'retry_delay': timedelta(seconds=5),
}

def source1_to_s3():
# code that writes our data from source 1 to s3
def source3_to_s3():
# code that writes our data from source 3 to s3
def source2_to_hdfs(config, ds, **kwargs):
# code that writes our data from source 2 to hdfs
# ds: the date of run of the given task.
# kwargs: keyword arguments containing context parameters for the run.

def get_hdfs_config():
#return HDFS configuration parameters required to store data into HDFS.
return None #Return to none

config = get_hdfs_config()

dag = DAG(
dag_id='my_dag',
description='Simple tutorial DAG',
default_args=default_args)

src1_s3 = PythonOperator(
task_id='source1_to_s3',
python_callable=source1_to_s3,
dag=dag)

src2_hdfs = PythonOperator(
task_id='source2_to_hdfs',
python_callable=source2_to_hdfs,
op_kwargs = {'config' : config},
provide_context=True,
dag=dag
)

src3_s3 = PythonOperator(
task_id='source3_to_s3',
python_callable=source3_to_s3,
dag=dag)

spark_job = BashOperator(
task_id='spark_task_etl',
bash_command='spark-submit --master spark://localhost:7077 spark_job.py',
dag = dag)

# setting dependencies
src1_s3 >> spark_job
src2_hdfs >> spark_job
src3_s3 >> spark_job


#Airflow Read File from S3

  • Task Settings: The default_args section tells Airflow how often to run tasks (every day) and how to behave if something goes wrong (retry once after 5 seconds if it fails).
  • Data Functions: There are three functions: two move data from source 1 and source 3 to S3, and one moves data from source 2 to HDFS (another storage system). A helper function is used to get the necessary HDFS setup.
  • Creating a Workflow (DAG): A DAG (workflow) named my_dag is created, which organizes tasks and controls when they run.
  • Defining Tasks: Tasks are created to handle data transfer and a Spark job. The PythonOperator runs Python functions, and the BashOperator runs a Spark job (a big data processing job).
  • Task Order: All data transfer tasks (src1_s3, src2_hdfs, src3_s3) must finish before running the Spark job (spark_job).

Advantages of Downloading Airflow Read File from S3

Here are the major advantages of Airflow Read File from S3

  • Dynamic Integration: To generate dynamic pipelines, Airflow employs Python as the backend programming language. There are several operators, hooks, and connectors available to create DAG and link them to workflows.
  • Refreshing the Server: Any changes you make to DAGs or tasks are not automatically reflected in Airflow. To load the most recent configurations, you should use the web server’s Refresh button.
  • Responsible Scheduler: It appears logical to start the scheduler when the web server boots up. Airflow does not operate in this manner for an important reason: to provide separation of concerns when deployed in a cluster environment where the scheduler and web server are not always on the same machine.
  • Scalable: The airflow is intended to be infinitely scalable. You can define as many dependent workflows as you want. Airflow creates a message queue to orchestrate an arbitrary number of workers.

Conclusion

Airflow makes downloading files from Amazon S3 as simple as uploading them. It all comes down to one function call: load_file() or download_file().

While writing too many functions there can be a chance of potential risk while fetching data, why take the risk! Try Hevo, A No-code Automated Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations with a few clicks.

Try a 14-day free trial to explore all features, and check out our unbeatable pricing for the best plan for your needs.

Frequently Asked Questions

1. Does Airflow run on AWS?

Yes, Airflow can be operated on AWS. There are also services like Amazon EC2, which is perfect for your custom setting, and there’s Amazon Managed Workflows for Apache Airflow (MWAA), a fully managed service that helps in running and scaling Airflow workflows on AWS.

2. What is the command to get files from S3?

To get files from an S3 bucket using the AWS CLI, you can use the aws s3 cp command. For example:
aws s3 cp s3://bucket-name/file-key local-file-path

3. Is Airflow good for ETL?

Yes, Airflow is suited well to ETL (extract, transform, load) processes because it gives users the ability to orchestrate complex workflows, schedule data pipelines, and manage dependencies between tasks-which is a highly desirable attribute toward automation and real-time monitoring of ETL jobs across scale.

Davor DSouza
Research Analyst, Hevo Data

Davor DSouza is a data analyst with a passion for using data to solve real-world problems. His experience with data integration and infrastructure, combined with his Master's in Machine Learning, equips him to bridge the gap between theory and practical application. He enjoys diving deep into data and emerging with clear and actionable insights.