Build Data Pipelines with Apache Airflow: 5 Easy Steps

• February 17th, 2022

Data Pipelines with Airflow: Featured Image

Today with the rapid surge in information, managing data can be tricky. Moreover, trying to control and monitor all the data-related processes consumes excess resources and time, both of which are precious for any organization. To overcome these bottlenecks, businesses nowadays are relying on Data Pipelines to automate their data collection and transformation tasks. Companies are either setting up their in-house Data Pipelines or are hiring the services of a third party to manage their data needs.

Apart from managing data, another concern that businesses face, is with regards to Data Monitoring and Error-Detection in Projects. Apache Airflow is a popular tool that provides organizations with a solution for both of these issues. It is an open-source platform that supports companies in automating their lengthy workflows. Furthermore, Apache Airflow also offers Data Pipeline facilities to its users.

This article will introduce you to Apache Airflow and Data Pipelines along with their key features. It will then provide you with 5 easy steps using which you can build Data Pipelines with Apache Airflow. Read along to learn these steps and understand the benefits of using Apache Airflow as a Data Solution!

Table of Contents

What is Apache Airflow?

Data Pipelines with Apache Airflow: Airflow Logo
Image Source

Apache Airflow is a workflow automation platform that is popular for its open-source availability and scheduling capabilities. You can utilize this tool to programmatically author, schedule, and monitor any number of workflows. Businesses today use Airflow to organize complex computational workflows, build data processing pipelines, and easily perform ETL processes. Airflow operates on DAG (Directed Acyclic Graph) to construct and represent its workflow, and each DAG is formed of nodes and connectors. These Nodes depend on Connectors to link up with the other nodes and generate a dependency tree that manages your work efficiently.

Scale your data integration effortlessly with Hevo’s Fault-Tolerant No Code Data Pipeline

As the ability of businesses to collect data explodes, data teams have a crucial role to play in fueling data-driven decisions. Yet, they struggle to consolidate the data scattered across sources into their warehouse to build a single source of truth. Broken pipelines, data quality issues, bugs and errors, and lack of control and visibility over the data flow make data integration a nightmare.

1000+ data teams rely on Hevo’s Data Pipeline Platform to integrate data from over 150+ sources in a matter of minutes. Billions of data events from sources as varied as SaaS apps, Databases, File Storage and Streaming sources can be replicated in near real-time with Hevo’s fault-tolerant architecture. What’s more – Hevo puts complete control in the hands of data teams with intuitive dashboards for pipeline monitoring, auto-schema management, custom ingestion/loading schedules. 

All of this combined with transparent pricing and 24×7 support makes us the most loved data pipeline software on review sites.

Take our 14-day free trial to experience a better way to manage data pipelines.

Key Features of Apache Airflow

Apache Airflow contains the following unique features which have led to its immense popularity:

  • Dynamic Integration: Airflow implements Python Programming Language for its backend processing required to generate dynamic pipelines. Python provides certain Operators and Connectors that can easily create DAGs and use them to generate workflows.
  • Extensible: Airflow being an open-source platform allows you to customize its operators & executors. Moreover, you can also extend its libraries to make it fit for the level of abstraction that your work requires.
  • Elegant User Interface: Airflow relies on the Jinja templates for building pipelines, and hence can develop lean and explicit workflows. Furthermore, with Apache Airflo, you can parameterize your scripts in a hassle-free manner.
  • Scalable: Airflow can scale up to infinity. This implies you can define any number of dependent workflows. Airflow also provides a message queue that can orchestrate these workflows easily. 

To learn more about Apache Airflow, visit here.

What is a Data Pipeline?

Data Pipelines with Apache Airflow: Pipeline Logo
Image Source

Your business generates and works with vast quantities of data. Now, to get any real insight from this sea of data, you have to: 

  • Extract relevant data from numerous data sources that are related to your business.
  • Clean and transform the extracted data and make it analysis-ready.
  • Load the huge datasets into a Data Lake or Data Warehouse to create a single source of truth.

Now, depending on the order of these steps you can carry out ETL (Extract Transform Load)or ELT (Extract Load Transform) processes and make your data fit for analysis. However, if you wish to perform these tasks manually, various aspects can go wrong. Your code may throw errors, the data may go missing, you may load inconsistent data and many other such bottlenecks are bound to happen in a manual ETL/ELT approach.

Businesses utilize a Data Pipeline tool to automate the ETL/ELT process in a reliable, and secure manner. A Data Pipeline consists of a sequence of actions that can ingest raw data from multiple sources, transform them and load them to a storage destination. A Data Pipeline may also provide you with end-to-end management with features that can fight against errors and bottlenecks.

Key Features of a Data Pipeline

Data Pipelines consists of the following features which define their performance, and durability:

  • Accessibility: Data Pipeline ensures that Data scientists and other professionals have real-time access to data for hypothesis evaluation, experimentation, and other tasks.
  • Scalability: Data Pipelines possess the ability to scale up or down as your business workload requirements change. This way you can ingest more data at a low price.
  • Efficiency: Leveraging a Data Pipeline will allow you to have Machine Learning results ready within minimum latency to fulfill your business objectives.
  • Monitoring: You can set up automatic alerts about the condition of both your data and pipeline and provide a proactive response to prevent potential business risks.

To learn more about Data Pipelines, visit our blog.

Steps to Build Data Pipelines with Apache Airflow

Data Pipelines with Apache Airflow: Workflow
Image Source

Airflow Pipeline has various uses and one primary use is error detection. In this section, you will learn how to build Data Pipelines with Apache Airflow to manage errors caused by exceptions. The following steps will help you to create Data Pipelines with Apache Airflow and extract errors:

Step 1: Install the Docker Files and UI for Apache Airflow

To set up Data Pipelines with Apache Airflow you first need to install its Docker Files and User Interface. You can easily retrieve the docker file with all its configuration from Puckel’s Github repository.

Install the Docker client and, run the below command to initiate the Airflow server:

docker-compose -f ./docker-compose-LocalExecutor.yml up -d

It will provide the following output:

Data Pipelines with Apache Airflow: Log Files
Image Source

Next, set up the Airflow UI by downloading it from http://localhost:8080.

The Airflow UI portal is capable of triggering a DAG (Direct Acyclic Graph) and providing the status of current tasks as shown in the below image.

Data Pipelines with Apache Airflow: DAG Setup
Image Source

Step 2: Create a DAG File

Now since you have installed the Docker Files and UI, you can create a DAG file for Data Pipelines with Apache Airflow easily. You must first define default arguments and then instantiate your DAG class using a DAG name, say monitor_errors using the following code: 

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 8, 13),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "catchup": False,
}

dag = DAG("monitor_errors", default_args=default_args, schedule_interval=timedelta(1))

Next, you need to extract all the log files stored on the server. Now, a major advantage of building Data Pipelines with Apache Airflow is that it supports the concurrency of running tasks. This implies you can create one downloading task per log file, run all the tasks in parallel, and add all of them into one common list. 

All of your log files are stored on the server and you can seamlessly fetch them via the sftp command. To do that, you must first configure your SFTP operator using an SSH connection id in the Airflow portal as follows:

log_list = ['securityApp.log', 'mainApp.log', 'extApp.log', 'timeApp.log', 'tokenApp.log',
            'bridgeApp.log', 'daemonApp.log', 'notificationApp.log', 'messageApp.log']

dl_tasks = []
for file in log_list:
    op = SFTPOperator(task_id=f"download_{file}",
                ssh_conn_id="log_server",
                local_filepath=f"{base_folder}/{file}",
                remote_filepath=f"{remote_path}/{file}",
                operation=SFTPOperation.GET,
                create_intermediate_dirs=True,
                dag=dag)
    dl_tasks.append(op)

After that, refresh the Airflow UI and it will load your DAG file. You will be able to see the new DAG – monitor_errors on the list. Next, click on the DAG name and it will present a graph, Moreover, you can also check all the download tasks as shown below.

Data Pipelines with Apache Airflow: DAG Processing
Image Source

Now, before triggering a DAG batch, configure the SSH connection, so that it is usable for your SFTP operator. Go to the Admin menu and click on Connections to generate a new SSH connection as shown in the below image.

Data Pipelines with Apache Airflow: Connections
Image Source

To access an SSH server without a password, assume that the public key has already been set up in the server and your private key is present in:

/usr/local/airflow/.ssh/id_rsa

You must leave the Password field empty, and input the below JSON data into the Extra field:

{
  "key_file": "/usr/local/airflow/.ssh/id_rsa",
  "timeout": "10",
  "compress": "false",
  "no_host_key_check": "false",
  "allow_host_key_change": "false"
}

Next, start the DAG and trigger it. This will turn some tasks to green implying that they are running. The remaining tasks will be grey representing that they are still in the queue. This way you can track your workflow after creating Data Pipelines with Apache Airflow. This is shown in the image below.

Data Pipelines with Apache Airflow: DAG Processing
Image Source

All of the capabilities, none of the firefighting

Using manual scripts and custom code to move data into the warehouse is cumbersome. Frequent breakages, pipeline errors and lack of data flow monitoring makes scaling such a system a nightmare. Hevo’s reliable data pipeline platform enables you to set up zero-code and zero-maintenance data pipelines that just work.

  • Reliability at Scale: With Hevo, you get a world-class fault-tolerant architecture that scales with zero data loss and low latency. 
  • Monitoring and Observability: Monitor pipeline health with intuitive dashboards that reveal every stat of pipeline and data flow. Bring real-time visibility into your ELT with Alerts and Activity Logs 
  • Stay in Total Control: When automation isn’t enough, Hevo offers flexibility – data ingestion modes, ingestion, and load frequency, JSON parsing, destination workbench, custom schema management, and much more – for you to have total control.    
  • Auto-Schema Management: Correcting improper schema after the data is loaded into your warehouse is challenging. Hevo automatically maps source schema with destination warehouse so that you don’t face the pain of schema errors.
  • 24×7 Customer Support: With Hevo you get more than just a platform, you get a partner for your pipelines. Discover peace with round the clock “Live Chat” within the platform. What’s more, you get 24×7 support even during the 14-day full-feature free trial.
  • Transparent Pricing: Say goodbye to complex and hidden pricing models. Hevo’s Transparent Pricing brings complete visibility to your ELT spend. Choose a plan based on your business needs. Stay in control with spend alerts and configurable credit limits for unforeseen spikes in data flow. 

Step 3: Extract Lines Containing Exceptions

In this step of building Data Pipelines with Apache Airflow, you have to add all the lines containing “exception” in the log files and write them into a file(errors.txt) which must be present in the same folder.

Furthermore, the grep command can search specific text in a collection of files, given that all the files are present in one folder.

bash_command = """
    grep -E 'Exception' --include=*.log -rnw '{{ params.base_folder }}' > {{ params.base_folder }}/errors.txt
    ls -l {{ params.base_folder }}/errors.txt && cat {{ params.base_folder }}/errors.txt
"""
grep_exception = BashOperator(task_id="grep_exception",
                        bash_command=bash_command,
                        params={'base_folder': base_folder},
                        dag=dag)

Refresh the DAG and trigger it again, the graph view will be updated as below.

Data Pipelines with Apache Airflow: DAG
Image Source

Step 4: Extract the Required Feilds

Now you can parse the log file line by line to extract the required fields. A Python operator that works on a regular expression can help you in this task using the below code:

def parse_log(logString):
    r = r".+/(?P<file>.+):(?P<line>d+):[[]] (?P<date>.+)/(?P<time>d{2}:d{2}:d{2},d{3}) ERROR ?(?:SessionId : )?(?P<session>.+)? [(?P<app>w+)] .+ (?:Error :|service Exception) (?P<module>(?=[w.-]+ : )[w.-]+)?(?: : )?(?P<errMsg>.+)"
    group = re.match(r, logString)
    return group.groups()

def parse_log_file(filepath, tablename):
    with open(filepath) as fp:
        records = []
        for line in fp:
            records.append(parse_log(line))
        save_to_database(tablename, records)

parse_log = PythonOperator(task_id='parse_log',
                        python_callable=parse_log_file,
                        op_kwargs={'filepath': f'{base_folder}/errors.txt',
                                   'tablename': f'{table_name}'},
                        dag=dag)

The extracted fields are saved in a database (Postgres DB is used for this example) and you can perform queries on them later. 

Now, for using the Postgres database, you need to configure the Airflow Portal connection with Postgres. To achieve this, modify your existing postgres_default connection as shown in the below image.

Data Pipelines with Apache Airflow: Configuration
Image Source

Now, on triggering the DAG again, you will get the following output:

Data Pipelines with Apache Airflow: DAG
Image Source

This proves that all tasks ran successfully and all the data from logs are parsed and sent to the database.

You can now click on Ad Hoc Query present under the Data Profiling menu and type the required SQL query statement as shown below.

Data Pipelines with Apache Airflow: Adhoc Query
Image Source

Now, since you know how to create Data Pipelines with Apache Airflow, it’s time to query it and extract error details.

Step 5: Query the Table to Generate Error Records

The steps until now were about building Data Pipelines with Apache Airflow. This final step will show how to use the built pipeline to detect and monitor errors. You can query the saved table to count the number of errors and their type. Moreover, you can use another Python operator and query the database to get 2 report files:

  • One will contain the records for all the errors in the database.
  • The second one will be a statistics table to represent all the various types of errors in descending order of occurrence.

Use the following code to develop the above 2 reports:

def gen_error_reports(statfile, logfile, tablename, **kwargs):
    # database hook
    db_hook = PostgresHook(postgres_conn_id='postgres_default', schema='airflow')
    db_conn = db_hook.get_conn()
    db_cursor = db_conn.cursor()

    sql = f"SELECT error, count(*) as occurrence FROM {tablename} group by error ORDER BY occurrence DESC"
    sql_output = "COPY ({0}) TO STDOUT WITH CSV HEADER".format(sql)

    # Set up a variable to store our file path and name.
    with open(statfile, 'w') as f_output:
        db_cursor.copy_expert(sql_output, f_output)


gen_reports = PythonOperator(task_id='gen_reports',
                        python_callable=gen_error_reports,
                        op_kwargs={'statfile': f'{base_folder}/error_stats.csv',
                                   'logfile': f'{base_folder}/error_logs.csv',
                                   'tablename': f'{table_name}'},
                        provide_context=True,
                        dag=dag)

Now, trigger the DAG again to get the following output:

Data Pipelines with Apache Airflow: DAG
Image Source

Two report files are generated in the folder can be seen in the below image:

Data Pipelines with Apache Airflow: Error Files
Image Source

The error_logs.csv folder will contain all the exception records present in the database as shown below:

Data Pipelines with Apache Airflow: Number of Errors
Image Source

The error_stats.csv will hold the different types of errors with occurrences as shown below:

Data Pipelines with Apache Airflow: Type of Errors
Image Source

That’s it! You are now ready to build Data Pipelines with Apache Airflow on your own.

Benefits of Data Pipelines with Apache Airflow

The previous section taught you how to develop Data Pipelines with Apache Airflow. Now, it’s time to learn the following benefits that you can get by implementing this pipeline in your work:

  • The potential of implementing Data Pipelines with Apache Airflow’s Python code enables you to build arbitrarily complex pipelines that can carry your desired tasks seamlessly. Moreover, Airflow’s integration with Python allows you to add integrations of multiple other tools with Airflow. The Airflow community already contains a rich collection of Airflow extensions which enables you to connect with a multitude of databases, cloud services, and much more.
  • Leveraging Airflow’s rich Scheduling Semantics you can run pipelines at regular intervals. Furthermore, you can build Data Pipelines with Apache Airflow that operates on incremental processing and avoid unnecessary expensive recomputations.
  • Airflow features such as backfilling allow you to reprocess historical data easily. This way you can build Data Pipelines with Apache Airflow which are capable to recompute derived data sets even after modifying your code.
  • Airflow’s web interface simplifies the task of monitoring your running pipeline’s results and debugging any failures that may harm its progress,

Conclusion

This article introduced you to Apache Airflow and Data Pipelines along with their key features. It also explained an easy process using which you can build Data Pipelines with Apache Airflow on your own. Furthermore, the blog listed down the benefits of setting up a Pipeline using Apache Airflow.

Visit our Website to Explore Hevo

Apache Airflow is a great tool, however, at times, you need to transfer its data to a Data Warehouse for further analysis. Building an in-house solution for this process could be an expensive and time-consuming task Hevo Data, on the other hand, offers a No-code Data Pipeline that can automate your data transfer process, hence allowing you to focus on other aspects of your business like Analytics, Customer Management, etc. This platform allows you to transfer data from 150+ sources like Airflow to Cloud-based Data Warehouses like Snowflake, Google BigQuery, Amazon Redshift, etc. It will provide you with a hassle-free experience and make your work life much easier.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. 

Share your views on the Data Pipelines with Apache Airflow in the comments section!

No Code Data Pipeline For Your Data Warehouse