Today with the rapid surge in information, managing data can be tricky. Moreover, trying to control and monitor all the data-related processes consumes excess resources and time, both of which are precious for any organization. To overcome these bottlenecks, businesses nowadays rely on Data Pipelines to automate their data collection and transformation tasks. Companies are either setting up their in-house Data Pipelines or hiring the services of a third party to manage their data needs.

Apart from managing data, another concern that businesses face is with regard to Data Monitoring and Error-Detection in Projects. Apache Airflow is a popular tool that provides organizations with a solution for both of these issues. It is an open-source platform that supports companies in automating their lengthy workflows. Furthermore, Apache Airflow also offers Data Pipeline facilities to its users.

This article will introduce you to Apache Airflow and Data Pipelines and their key features. It will then provide you with 5 easy steps to build Data Pipelines with Apache Airflow. Read along to learn these steps and understand the benefits of using Apache Airflow as a Data Solution!

What is Apache Airflow?

Data Pipelines with Apache Airflow: Airflow Logo
Image Source

Apache Airflow is a workflow automation platform that is popular for its open-source availability and scheduling capabilities. You can utilize this tool to programmatically author, schedule, and monitor any number of workflows. Businesses today use Airflow to organize complex computational workflows, build data processing pipelines, and easily perform ETL processes. Airflow operates on DAG (Directed Acyclic Graph) to construct and represent its workflow, and each DAG is formed of nodes and connectors. These Nodes depend on Connectors to link up with the other nodes and generate a dependency tree that manages your work efficiently.

Scale your data integration effortlessly with Hevo’s Fault-Tolerant No Code Data Pipeline

As the ability of businesses to collect data explodes, data teams have a crucial role to play in fueling data-driven decisions. Yet, they struggle to consolidate the data scattered across sources into their warehouse to build a single source of truth. Broken pipelines, data quality issues, bugs and errors, and lack of control and visibility over the data flow make data integration a nightmare.

1000+ data teams rely on Hevo’s Data Pipeline Platform to integrate data from over 150+ sources in a matter of minutes. Billions of data events from sources as varied as SaaS apps, Databases, File Storage and Streaming sources can be replicated in near real-time with Hevo’s fault-tolerant architecture. What’s more – Hevo puts complete control in the hands of data teams with intuitive dashboards for pipeline monitoring, auto-schema management, custom ingestion/loading schedules. 

All of this combined with transparent pricing and 24×7 support makes us the most loved data pipeline software on review sites.

Get Started with Hevo for Free

Key Features of Apache Airflow

Apache Airflow contains the following unique features which have led to its immense popularity:

  • Dynamic Integration: Airflow implements Python Programming Language for its backend processing required to generate dynamic pipelines. Python provides certain Operators and Connectors that can easily create DAGs and use them to generate workflows.
  • Extensible: Airflow being an open-source platform allows you to customize its operators & executors. Moreover, you can also extend its libraries to make it fit for the level of abstraction that your work requires.
  • Elegant User Interface: Airflow relies on the Jinja templates for building pipelines, and hence can develop lean and explicit workflows. Furthermore, with Apache Airflo, you can parameterize your scripts in a hassle-free manner.
  • Scalable: Airflow can scale up to infinity. This implies you can define any number of dependent workflows. Airflow also provides a message queue that can orchestrate these workflows easily. 

To learn more about Apache Airflow, visit here.

What is a Data Pipeline?

Your business generates and works with vast quantities of data. Now, to get any real insight from this sea of data, you have to: 

  • Extract relevant data from numerous data sources that are related to your business.
  • Clean and transform the extracted data and make it analysis-ready.
  • Load the huge datasets into a Data Lake or Data Warehouse to create a single source of truth.

Now, depending on the order of these steps you can carry out ETL (Extract Transform Load) or ELT (Extract Load Transform) processes and make your data fit for analysis. However, various aspects can go wrong if you wish to perform these tasks manually. Your code may throw errors, the data may go missing, you may load inconsistent data, and many other such bottlenecks are bound to happen in a manual ETL/ELT approach.

Businesses utilize a Data Pipeline tool to automate the ETL/ELT process in a reliable, and secure manner. A Data Pipeline consists of a sequence of actions that can ingest raw data from multiple sources, transform them and load them to a storage destination. A Data Pipeline may also provide you with end-to-end management with features that can fight against errors and bottlenecks.

Key Features of a Data Pipeline

Data Pipelines consists of the following features which define their performance, and durability:

  • Accessibility: Data Pipeline ensures that Data scientists and other professionals have real-time access to data for hypothesis evaluation, experimentation, and other tasks.
  • Scalability: Data Pipelines possess the ability to scale up or down as your business workload requirements change. This way you can ingest more data at a low price.
  • Efficiency: Leveraging a Data Pipeline will allow you to have Machine Learning results ready within minimum latency to fulfill your business objectives.
  • Monitoring: You can set up automatic alerts about the condition of both your data and pipeline and provide a proactive response to prevent potential business risks.

To learn more about Data Pipelines, visit our blog.

Steps to Build Data Pipelines with Apache Airflow

setting up data pipeline with airflow

Airflow Pipeline has various uses and one primary use is error detection. In this section, you will learn how to build Data Pipelines with Apache Airflow to manage errors caused by exceptions. The following steps will help you to create Data Pipelines with Apache Airflow and extract errors:

Step 1: Install the Docker Files and UI for Apache Airflow

To set up Data Pipelines with Apache Airflow you first need to install its Docker Files and User Interface. You can easily retrieve the docker file with all its configuration from Puckel’s Github repository.

Install the Docker client and, run the below command to initiate the Airflow server:

docker-compose -f ./docker-compose-LocalExecutor.yml up -d

Next, set up the Airflow UI by downloading it from http://localhost:8080.

The Airflow UI portal can trigger a DAG (Direct Acyclic Graph) and provide the status of current tasks.

Step 2: Create a DAG File

Now since you have installed the Docker Files and UI, you can create a DAG file for Data Pipelines with Apache Airflow easily. You must first define default arguments and then instantiate your DAG class using a DAG name, say monitor_errors using the following code: 

default_args = {
    "owner": "airflow",
    "depends_on_past": False,
    "start_date": datetime(2020, 8, 13),
    "email": ["airflow@airflow.com"],
    "email_on_failure": False,
    "email_on_retry": False,
    "retries": 1,
    "retry_delay": timedelta(minutes=5),
    "catchup": False,
}

dag = DAG("monitor_errors", default_args=default_args, schedule_interval=timedelta(1))

Next, you need to extract all the log files stored on the server. Now, a major advantage of building Data Pipelines with Apache Airflow is that it supports the concurrency of running tasks. This implies you can create one downloading task per log file, run all the tasks in parallel, and add all of them into one common list. 

All of your log files are stored on the server and you can seamlessly fetch them via the sftp command. To do that, you must first configure your SFTP operator using an SSH connection id in the Airflow portal as follows:

log_list = ['securityApp.log', 'mainApp.log', 'extApp.log', 'timeApp.log', 'tokenApp.log',
            'bridgeApp.log', 'daemonApp.log', 'notificationApp.log', 'messageApp.log']

dl_tasks = []
for file in log_list:
    op = SFTPOperator(task_id=f"download_{file}",
                ssh_conn_id="log_server",
                local_filepath=f"{base_folder}/{file}",
                remote_filepath=f"{remote_path}/{file}",
                operation=SFTPOperation.GET,
                create_intermediate_dirs=True,
                dag=dag)
    dl_tasks.append(op)

After that, refresh the Airflow UI and it will load your DAG file. You will be able to see the new DAG – monitor_errors on the list. Next, click on the DAG name, and it will present a graph.

Now, before triggering a DAG batch, configure the SSH connection so that it is usable for your SFTP operator. Go to the Admin menu and click on Connections to generate a new SSH connection.

To access an SSH server without a password, assume that the public key has already been set up in the server and your private key is present in:

/usr/local/airflow/.ssh/id_rsa

You must leave the Password field empty, and input the below JSON data into the Extra field:

{
  "key_file": "/usr/local/airflow/.ssh/id_rsa",
  "timeout": "10",
  "compress": "false",
  "no_host_key_check": "false",
  "allow_host_key_change": "false"
}

Next, start the DAG and trigger it. This will turn some tasks to green implying that they are running. The remaining tasks will be grey representing that they are still in the queue. This way you can track your workflow after creating Data Pipelines with Apache Airflow.

Step 3: Extract Lines Containing Exceptions

In this step of building Data Pipelines with Apache Airflow, you have to add all the lines containing “exception” in the log files and write them into a file(errors.txt) which must be present in the same folder.

Furthermore, the grep command can search specific text in a collection of files, given that all the files are present in one folder.

bash_command = """
    grep -E 'Exception' --include=*.log -rnw '{{ params.base_folder }}' > {{ params.base_folder }}/errors.txt
    ls -l {{ params.base_folder }}/errors.txt && cat {{ params.base_folder }}/errors.txt
"""
grep_exception = BashOperator(task_id="grep_exception",
                        bash_command=bash_command,
                        params={'base_folder': base_folder},
                        dag=dag)

Refresh the DAG and trigger it again, the graph view will be updated.

Step 4: Extract the Required Feilds

Now you can parse the log file line by line to extract the required fields. A Python operator that works on a regular expression can help you in this task using the below code:

def parse_log(logString):
    r = r".+/(?P<file>.+):(?P<line>d+):[[]] (?P<date>.+)/(?P<time>d{2}:d{2}:d{2},d{3}) ERROR ?(?:SessionId : )?(?P<session>.+)? [(?P<app>w+)] .+ (?:Error :|service Exception) (?P<module>(?=[w.-]+ : )[w.-]+)?(?: : )?(?P<errMsg>.+)"
    group = re.match(r, logString)
    return group.groups()

def parse_log_file(filepath, tablename):
    with open(filepath) as fp:
        records = []
        for line in fp:
            records.append(parse_log(line))
        save_to_database(tablename, records)

parse_log = PythonOperator(task_id='parse_log',
                        python_callable=parse_log_file,
                        op_kwargs={'filepath': f'{base_folder}/errors.txt',
                                   'tablename': f'{table_name}'},
                        dag=dag)

The extracted fields are saved in a database (Postgres DB is used for this example) and you can perform queries on them later. 

Now, for using the Postgres database, you need to configure the Airflow Portal connection with Postgres. To achieve this, modify your existing postgres_default connection.

Now, trigger the DAG again, checking that all tasks ran successfully and all the data from logs are parsed and sent to the database.

You can now click on Ad Hoc Query present under the Data Profiling menu and type the required SQL query statement.

Now, since you know how to create Data Pipelines with Apache Airflow, it’s time to query it and extract error details.

Step 5: Query the Table to Generate Error Records

The steps until now were about building Data Pipelines with Apache Airflow. This final step will show how to use the built pipeline to detect and monitor errors. You can query the saved table to count the number of errors and their type. Moreover, you can use another Python operator and query the database to get 2 report files:

  • One will contain the records for all the errors in the database.
  • The second one will be a statistics table to represent all the various types of errors in descending order of occurrence.

Use the following code to develop the above 2 reports:

def gen_error_reports(statfile, logfile, tablename, **kwargs):
    # database hook
    db_hook = PostgresHook(postgres_conn_id='postgres_default', schema='airflow')
    db_conn = db_hook.get_conn()
    db_cursor = db_conn.cursor()

    sql = f"SELECT error, count(*) as occurrence FROM {tablename} group by error ORDER BY occurrence DESC"
    sql_output = "COPY ({0}) TO STDOUT WITH CSV HEADER".format(sql)

    # Set up a variable to store our file path and name.
    with open(statfile, 'w') as f_output:
        db_cursor.copy_expert(sql_output, f_output)


gen_reports = PythonOperator(task_id='gen_reports',
                        python_callable=gen_error_reports,
                        op_kwargs={'statfile': f'{base_folder}/error_stats.csv',
                                   'logfile': f'{base_folder}/error_logs.csv',
                                   'tablename': f'{table_name}'},
                        provide_context=True,
                        dag=dag)

Now, trigger the DAG again and check the report files generated.

The error_logs.csv folder will contain all the exception records present in the database; the error_stats.csv will hold the different types of errors with occurrences as shown below:

That’s it! You are now ready to build Data Pipelines with Apache Airflow on your own.

Benefits of Data Pipelines with Apache Airflow

The previous section taught you how to develop Data Pipelines with Apache Airflow. Now, it’s time to learn the following benefits that you can get by implementing this pipeline in your work:

  • The potential of implementing Data Pipelines with Apache Airflow’s Python code enables you to build arbitrarily complex pipelines that can carry your desired tasks seamlessly. Moreover, Airflow’s integration with Python allows you to add integrations of multiple other tools with Airflow. The Airflow community already contains a rich collection of Airflow extensions which enables you to connect with a multitude of databases, cloud services, and much more.
  • Leveraging Airflow’s rich Scheduling Semantics you can run pipelines at regular intervals. Furthermore, you can build Data Pipelines with Apache Airflow that operates on incremental processing and avoid unnecessary expensive recomputations.
  • Airflow features such as backfilling allow you to reprocess historical data easily. This way you can build Data Pipelines with Apache Airflow which are capable to recompute derived data sets even after modifying your code.
  • Airflow’s web interface simplifies the task of monitoring your running pipeline’s results and debugging any failures that may harm its progress,

Conclusion

This article introduced you to Apache Airflow and Data Pipelines along with their key features. It also explained an easy process using which you can build Data Pipelines with Apache Airflow on your own. Furthermore, the blog listed down the benefits of setting up a Pipeline using Apache Airflow.

Apache Airflow is a great tool, however, at times, you need to transfer its data to a Data Warehouse for further analysis. Building an in-house solution for this process could be an expensive and time-consuming task Hevo Data, on the other hand, offers a No-code Data Pipeline that can automate your data transfer process, hence allowing you to focus on other aspects of your business like Analytics, Customer Management, etc. This platform allows you to transfer data from 150+ sources like Airflow to Cloud-based Data Warehouses like Snowflake, Google BigQuery, Amazon Redshift, etc. It will provide you with a hassle-free experience and make your work life much easier.

FAQ

How to create a data pipeline using Airflow?

To create a data pipeline using Apache Airflow, first, install Airflow and configure the environment. Next, define a Directed Acyclic Graph (DAG), which represents your workflow. Within the DAG, create tasks using Python functions or pre-built operators, such as data extraction, transformation, or loading steps. Airflow schedules and monitors these tasks, ensuring they run in the correct order with dependencies defined in the DAG.

How to orchestrate an ETL data pipeline with Apache Airflow?

To orchestrate an ETL pipeline with Apache Airflow, define a DAG representing the Extract, Transform, and Load phases. Use Airflow’s operators (such as PythonOperator, BashOperator, or PostgresOperator) to write each ETL step. Set task dependencies to ensure extraction runs before transformation, and transformation runs before loading. Schedule the DAG to run at desired intervals, and Airflow will automate and monitor the ETL process.

Does Apache Airflow do ETL?

Yes, Apache Airflow can be used to perform ETL tasks by orchestrating the flow of data between different systems. While Airflow is not an ETL tool itself, it helps manage and schedule the individual components of an ETL process (e.g., extraction from source, data transformation, and loading into the destination) using custom code or pre-built operators.

Abhinav Chola
Research Analyst, Hevo Data

Abhinav Chola, a data science enthusiast, is dedicated to empowering data practitioners. After completing his Master’s degree in Computer Science from NITJ, he joined Hevo as a Research Analyst and works towards solving real-world challenges in data integration and infrastructure. His research skills and ability to explain complex technical concepts allow him to analyze complex data sets, identify trends, and translate his insights into clear and engaging articles.