Airflow is a Task Automation application. It helps organizations in scheduling tasks so that they can be completed at the appropriate time.

This relieves employees of repeating tasks. Additionally, Apache Airflow plans and orchestrates data pipelines or workflows.

Managing and analyzing massive amounts of data can be difficult if not properly planned and organized. Most business operations are managed by various apps, services, and websites that generate valuable data.

Amazon S3 is one of the best places to store large amounts of structured or unstructured data. It is a popular storage service for storing any type of data.

This article will teach you how to Download Airflow Read File from S3. You will also gain a comprehensive understanding of how to take an Airflow Read File from S3. Continue reading!

What is Apache Airflow?

Apache Airflow is a batch-oriented pipeline framework for developing and monitoring data workflows. Airflow was founded in 2014 by Airbnb to address big data and complex Data Pipeline issues.

They use a built-in web interface to write and schedule processes as well as monitor workflow execution. The Apache Software Foundation has adopted the Airflow project due to its growing popularity.

What are DAGs?

Directed Acyclic Graphs (DAGs) are used in Airflow to create workflows. DAGs are high-level diagrams that define the dependent and exclusive tasks that can be ordered and scheduled.

Here are the DAGs that read data from three sources independently. After that, we run a Spark job to join the data on a key and write the transformation output.

By defining a DAG, the scheduler can determine which tasks can be run immediately and which must wait for other tasks to finish. The Spark job must wait for the three “read” tasks before populating the data into S3.

NOTE:

You should have an S3 bucket set up and at least one file in it.

As an example, consider bds-airflow-bucket with a single post.JSON document:

Airflow is simple (yet restrictive) to install as a single package. Here is a typical file structure for our environment to add DAGs, set them up, and run them.


airflow                  # the root directory.
├── dags                 # root folder for all dags. files inside folders are not searched for dags.
│   ├── my_dag.py        # my dag (definitions of tasks/operators) including precedence.
│   └── ...
├── logs                 # logs for the various tasks that are run
│   └── my_dag           # DAG specific logs
│   │   ├── src1_s3      # folder for task specific logs (log files are created by date of run)
│   │   ├── src2_hdfs
│   │   ├── src3_s3
│   │   └── spark_task_etl
├── airflow.db           # SQLite database used by Airflow internally to track status of each DAG.
├── airflow.cfg          # global configuration for Airflow (this can be overriden by config inside the file.)
└── ...

Configuring Airflow Read File from S3

Here are the two steps on how to Download Airflow Read File from S3:

Before diving into them, have a look at the prerequisites first:

Importing Packages

# airflow related
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.operators.bash_operator import BashOperator
# other packages
from datetime import datetime
from datetime import timedelta

Define the DAG, Tasks, and Operators

Let’s define all of the tasks for our current workflow. We have three tasks that read data from their respective sources and store it in S3 and HDFS. They are defined as Python functions that will be called by our operators.

We can pass parameters to the function using **args and **kwargs from our operator. For example, the function source2 to hdfs accepts a named parameter config as well as two context parameters ds and **kwargs.

#Airflow Read File from S3
def source1_to_s3():
 # code that writes our data from source 1 to s3
def source2_to_hdfs(config, ds, **kwargs):
 # code that writes our data from source 2 to hdfs
 # ds: the date of run of the given task.
 # kwargs: keyword arguments containing context parameters for the run.
def source3_to_s3():
 # code that writes our data from source 3 to s3

We will define our basic setup by importing three classes: DAG, BashOperator, and PythonOperator. Also, configure default_args.

#Airflow Read File from S3
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2018, 9, 1),
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}

This aids in configuring the DAG’s default configuration. This link contains more information on configuring default args and the additional parameters available.

1) Adding the DAGs to the Airflow Scheduler

Make a new Python file in the /airflow/dags directory. The file is called s3 download.py. Start with the DAG boilerplate code and library imports. To communicate with the S3 bucket, you’ll need the S3Hook class:

Downloading a file is essentially the same as declaring a PythonOperator based task. It will invoke the download from s3() function, which takes three arguments:

  • key – a string containing the name/path of the file on S3. For example, posts.json will retrieve that file from the bucket’s root. Paths can also be specified, such as /data/posts/posts.json. Make certain that it corresponds to your case.
  • bucket_name – string, the name of the bucket from which you want to download the file.
  • local_path – a string indicating the location of the file to be saved. It’s important to note that this is a directory path, not a file path.
#Airflow Read File from S3
import os
...


def download_from_s3(key: str, bucket_name: str, local_path: str) -> str:
    hook = S3Hook('s3_conn')
    file_name = hook.download_file(key=key, bucket_name=bucket_name, local_path=local_path)
    return file_name
    
    
 with DAG(...) as dag:
    # Download a file
    task_download_from_s3 = PythonOperator(
        task_id='download_from_s3',
        python_callable=download_from_s3,
        op_kwargs={
            'key': 'posts.json',
            'bucket_name': 'bds-airflow-bucket',
            'local_path': '/Users/dradecic/airflow/data/'
        }
    )
#Airflow Read File from S3

The issue is that S3Hook downloads a file to the local path folder and names it arbitrarily with no extension. If that is not what designers want, then we will declare another task that renames the file.

The same function first creates an instance of the S3Hook class and then connects to the previously established connection. The hook instance’s download_file() method is then called to download the file.

2) Downloading Airflow Read File from S3

First, we’ll need to get the file from S3:

#Airflow Read File from S3

airflow tasks test s3_download download_from_s3

As you can see, Airflow saved the file from S3 to /Users/dradecic/airflow/data/airflow_tmp_0xrx7pyi, which is a completely random file name.

The second task is to rename it to S3 downloaded posts.json:

#Airflow Read File from S3

airflow tasks test s3_download rename_file

The task was completed successfully, so we should see the following file in the data folder

Before leaving this spot, here’s the final view of the entire code!

{
    'email_on_failure': False,
    'email_on_retry': False,
    'schedule_interval': '@daily',
    'retries': 1,
    'retry_delay': timedelta(seconds=5),
}

def source1_to_s3():
    # code that writes our data from source 1 to s3
def source3_to_s3():
    # code that writes our data from source 3 to s3
def source2_to_hdfs(config, ds, **kwargs):
    # code that writes our data from source 2 to hdfs
    # ds: the date of run of the given task.
    # kwargs: keyword arguments containing context parameters for the run.

def get_hdfs_config():
    #return HDFS configuration parameters required to store data into HDFS.
    return None #Return to none

config = get_hdfs_config()

dag = DAG(
  dag_id='my_dag', 
  description='Simple tutorial DAG',
  default_args=default_args)

src1_s3 = PythonOperator(
  task_id='source1_to_s3', 
  python_callable=source1_to_s3, 
  dag=dag)

src2_hdfs = PythonOperator(
  task_id='source2_to_hdfs', 
  python_callable=source2_to_hdfs, 
  op_kwargs = {'config' : config},
  provide_context=True,
  dag=dag
)

src3_s3 = PythonOperator(
  task_id='source3_to_s3', 
  python_callable=source3_to_s3, 
  dag=dag)

spark_job = BashOperator(
  task_id='spark_task_etl',
  bash_command='spark-submit --master spark://localhost:7077 spark_job.py',
  dag = dag)

# setting dependencies
src1_s3 >> spark_job
src2_hdfs >> spark_job
src3_s3 >> spark_job


#Airflow Read File from S3

Output:

If there are any issues you face, visit here

Advantages of Downloading Airflow Read File from S3

Here are the major advantages of Airflow Read File from S3

  • Dynamic Integration: To generate dynamic pipelines, Airflow employs Python as the backend programming language. There are several operators, hooks, and connectors available to create DAG and link them to workflows.
  • Refreshing the Server: Any changes you make to DAGs or tasks are not automatically reflected in Airflow. To load the most recent configurations, you should use the web server’s Refresh button.
  • Responsible Scheduler: It appears logical to start the scheduler when the web server boots up. Airflow does not operate in this manner for an important reason: to provide separation of concerns when deployed in a cluster environment where the scheduler and web server are not always on the same machine.
  • Scalable: The airflow is intended to be infinitely scalable. You can define as many dependent workflows as you want. Airflow creates a message queue to orchestrate an arbitrary number of workers.

Conclusion

Airflow makes downloading files from Amazon S3 as simple as uploading them. It all comes down to one function call: load_file() or download_file().

While writing too many functions there can be a chance of potential risk while fetching data, why take the risk! Try Hevo, A No-code Automated Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations with a few clicks.

Visit our Website to Explore Hevo

Hevo Data will automate your data transfer process, hence allowing you to focus on other aspects of your business like Analytics, Customer Management, etc. Hevo provides a wide range of sources – 150+ Data Sources (including 40+ Free Sources) – that connect with over 15+ Destinations and load them into a destination to analyze real-time data at transparent pricing and make Data Replication hassle-free.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Share your experience of learning the Guide to Download Airflow Read File from S3 in the comment section below! We would love to hear your thoughts.

Davor DSouza
Research Analyst, Hevo Data

Davor DSouza is a data analyst with a passion for using data to solve real-world problems. His experience with data integration and infrastructure, combined with his Master's in Machine Learning, equips him to bridge the gap between theory and practical application. He enjoys diving deep into data and emerging with clear and actionable insights.

No-code Data Pipeline For Amazon S3