Airflow Lineage using OpenLineage Simplified

Isola Saheed Ganiyu • Last Modified: December 29th, 2022

Airflow Lineage - Featured Image

Sometimes, you may encounter an error while processing data. To determine the root cause of this error, you may need to track the origin of the dataset where the error occurred. If you already have a complex data system, it might be challenging to investigate the source of a dataset. 

Data lineage in Airflow Lineage is a process that analyzes data in terms of its origin, how it has transformed, and the reasons for its movement. With the metadata provided by a data lineage system, you can easily track where an error occurred. 

The best data lineage software on the internet is OpenLineage. OpenLineage is open-source software that offers tools that track the metadata of data sources and operators. The software then records this information and makes it available to software engineers to help them fix underlying errors in datasets.

The benefits of OpenLineage are limitless when used with a centralized data ecosystem like Apache Airflow. Unlike traditional data systems, which distribute data into multiple locations, Airflow places all data in a central location. In Airflow, related data are grouped to make them easily observable to users. 

Using OpenLineage with Apache Airflow would mean that you can access all the metadata about your dataset from a single spot. 

This tutorial will show you how to run data lineage with Apache Airflow and Open Lineage. 

Table of Contents 

Introduction to Data Lineage

Data Lineage
Image Source

Data lineage is a technique that tracks the origin of a dataset and how it has changed over time to attain its current state. It is a map of the data’s life cycle: from its creation to its transformation and consumption. Businesses use data lineage to track the cause of errors, understand changes in data, and make effective business decisions. 

Other uses of data lineage are:

  • System Migrations: Formerly, businesses needed to break complex data into chunks before uploading them to a new system. This was to prevent the task from failing. Data lineage combats this issue by informing you of all the operations your dataset has undergone. This prevents you from loading your data into an incompatible system.
  • Check Regulatory Compliance: Data lineage visualizes your data journey to help you determine if it complies with government regulations.

The software that provides standard data lineage is OpenLineage

What is OpenLineage?

OpenLineage logo
Image Source

OpenLineage is the software that provides metadata about datasets. This software comes with tools like integrations and extractors, that collect this data and store them. After collecting this metadata, OpenLineage visualizes the data’s journey in 3 layers: Producers (sources), Backend, and Consumers. 

Why Apache Airflow Data Lineage?

 

OpenLineage and Airflow - Airflow Guides
Image Source

Sometimes, your datasets may have passed through different departments in an organization. But if you don’t have a central system in your organization, you might waste too much time finding the origin of your data. Central data systems like Apache Airflow eliminate this issue because they compile data across departments. 

Apache Airflow analyzes your datasets based on the relationships they have. Each dataset is grouped into DAGs (Directed Acyclic Graphs) with related datasets. If you use Open lineage with Airflow, previous forms of your datasets will be in DAGs with the new datasets.

The software lets you schedule and monitors your tasks. Thanks to OpenLineage, you can also trace errors easily if any of your tasks fail.

You need to understand the following terms while performing airflow lineage with OpenLineage:

  • Dataset: The representation of each piece of data in your data lineage graphs.
  • Integrations: These are elements that generate metadata about your data’s source.
  • Operators: Operators are clients that process datasets. Examples are PostgreOperator and SnowflakeOperator.
  • Extractors: Extractors are elements that collect metadata about operators. However, not all operators have extractors. 
  • Job: A job is a task that creates or consumes a dataset. 
  • Run: A situation where a job generates lineage metadata. For instance, every job performed with OpenLineage on Airflow is a run.
  • Facet: A facet is a unit of lineage metadata. 
  • Lineage Frontend: This process lets you see and interacts with your lineage metadata. It may visualize your metadata in the form of graphs. A good example is Marquez.
  • Lineage Backend: A lineage backend processes and stores data.
  • Lineage Metadata: This provides information about your Airflow jobs and datasets. 

Now, let’s show you how to run Airflow Lineage with OpenLineage.

Scale your data integration effortlessly with Hevo’s Fault-Tolerant No Code Data Pipeline

As the ability of businesses to collect data explodes, data teams have a crucial role to play in fueling data-driven decisions. Yet, they struggle to consolidate the scattered data in their warehouse to build a single source of truth. Broken pipelines, data quality issues, bugs and errors, and lack of control and visibility over the data flow make data integration a nightmare.

1000+ data teams rely on Hevo’s Data Pipeline Platform to integrate data from over 150+ sources in a matter of minutes. Billions of data events from sources as varied as SaaS apps, Databases, File Storage and Streaming sources can be replicated in near real-time with Hevo’s fault-tolerant architecture. What’s more – Hevo puts complete control in the hands of data teams with intuitive dashboards for pipeline monitoring, auto-schema management, and custom ingestion/loading schedules. 

All of this combined with transparent pricing and 24×7 support makes us the most loved data pipeline software on review sites.

Take our 14-day free trial to experience a better way to manage data pipelines.

Get started for Free with Hevo!

Running Apache Airflow Data Lineage with OpenLineage

To run airflow lineage with OpenLineage, you need a lineage frontend. Today, we’ll use Marquez as our lineage frontend. After running Marquez, we’ll use a PostgreOperator to analyze our lineage data. 

Follow these steps to perform these operations:

Step 1: Download and Install Docker and Astro CLI to Your System

Docker is a platform used to build and run applications. One of such applications is Astro CLI. 

  • Install Docker by entering https://docs.docker.com/docker-for-windows/install into your browser. 
  • If you use Windows, you’ll also need to download Astro CLI. Astro CLI is a command line that lets you run Apache Airflow jobs. You can download Astro CLI by writing the following in your Mac Terminal:
brew install astro

Step 2: Start Running Marquez

  • Go back to your browser and input https://github.com/MarquezProject/marquez#quickstart.
  • Then, copy the source code on the QuickStart page to Terminal.
  • Start a new Astro project and open a directory entering astrocloud dev init in your command line.
  • Give your directory a name. Here, we’ll use openlineage.
  • Next, tap on your requirements.txt file and add openlineage-airflow to the file. 
  • Now, click on the .env file in the directory and enter the following variables:
OPENLINEAGE_URL=http: //host.docker.internal:5000OPENLINEAGE_NAMESPACE=exampleAIRFLOW_LINEAGE_BACKEND=Openlineage.lineage_backend.OpenL
  • Click on the config.yaml file in your Astro CLI directory and modify it to choose a port for Postgres:
#airflow lineage
Project:
name: openlineage
postgres:
port: 5438
  • Run Apache Airflow by writing this command:
    astrocloud dev start

Step 3: Generate and View Your Airflow Lineage Data

  • Create a database for Postgres with psql:
Psql -h localhost -p 5438
-U postgres
-enter password ‘postgres’
when prompted>
create database lineagedemo;
  • Open two source tables for your DAGs by running these queries:
#airflow lineage
CREATE TABLE IF NOT EXISTS
new_students _1
(date DATE, type SIMON,
name SIMON, age INTEGER)

CREATE TABLE IF NOT EXISTS
new_student_2
date DATE, type SIMON,
name SIMON, age INTEGER)
INSERT INTO 
new_students_1 (date, type, name, age)
VALUES
(‘2005-10-11’, ‘freshman’, ‘James’, 10).
(2005-08-05’, ‘freshman’, ‘Kate’, ’10).

INSERT INTO
new_student_2 (date, type,name, age)
VALUES
(‘2004-01-07’, ‘junior’, ‘Samuel’, ‘12’)
(2004-03-08’, ‘junior’, ‘Elizabeth’, 12’)
#airflow lineage
  • Set up an Airflow Connection for the Postgre database created earlier:
Host: localhost
Schema: open_lineage
Login: (Your Astro username)
Password: blank
Port: 5438
Extras: n/a
  • Now, start running your Airflow DAGs. Here, you’ll need to run 2 DAGs. The first will create a table that combines the data from the two source tables (new_student_1 and new_student_2). We will name this table new_students_combined.

On the other hand,  the second DAG will create a reporting table for the aggregated data.

Below is the command for the first DAG run

#airflow lineage
from airflow import DAG
from airflow.providers.postgres.
operators.postgres import
PostgresOperator

From datetime import datetime
timedelta

create_table_query= ‘’’
CREATE TABLE IF NOT EXISTS new_students_combined (date DATE type SIMON, name SIMON, age INTEGER):

combine_data_query ‘’’
INSERT INTO new_students_combined (date, type, name, age)
SELECT *
FROM new_student_1 
UNION
 SELECT *
FROM new_student_2; 
‘’’

with DAG(lineage-combine-postgres’,
start_date =datetime (2005, 10, 11),
max_active-runs =1
schedule_interval =@daily’,
default_args = {
‘retries’ : 1,
‘retry_delay’;
timedelta(minutes=1)
      },
catchup=False
) as dag;
create_table = PostgresOperator(
task_id=’create_table’,
postgres_conn_id= ‘postgres_default’, 
sql=create_table_query
)


#airflow lineage


Insert_data = PostgresOperator(
Task_id=’combine’,
postgres_conn_id= ‘postgres_default’ ,
sql=combine_data_query
)
create_table >>insert_data.
Now, use the second DAG to set up a reporting table for the data in the aggregated table (new_students_combined):

from airflow import DAG
from airflow.providers.postgres.
operators.postgres import
PostgresOperator

From datetime import dattime
timedelta

aggregate_reporting_query = ‘’’
INSERT INTO student_reporting_lomg
(date, type, number)
SELECT c.date,c.type,COUNT (c.type)
FROM new_students_combined c
GROUP BY date, type:
‘’’
#airflow lineage

With DAG(‘lineage-reporting-postgres’,

               start_date=datetime =(2005, 10, 11),

              max_active_runs=1,
             schedule_interval=’@daily’
             default_args={
‘retries’: 1,

‘retry_delay’:
timedelta(minutes=1)
},
catchup=False
) as dag:
create_table = PostgresOperator(
         task_id=’create_reporting_table’,
         postgres_conn_id=’postgres_default’,
        sql=’CREATE TABLE IF NOT EXISTS student_reporting_long (date DATE, type SIMON, number INTEGER);’ ,)
insert_data = PostgresOperator(
              task_id=’reporting’,
              postgres_conn_id=’postgres_default’,
             sql=
aggregate_reporting_query
)

create_table >> insert_data
#airflow lineage

After running these DAGs in Airflow, launch Marquez on your system. All the 4 jobs you have created in Airflow will appear there. If you tap on a job, you’ll see the complete lineage graph. 

The lineage graph will contain:

  • The two original datasets (new_student_1 and new_student_2)
  • The tasks that aggregated the datasets, thus creating new datasets: reporting and combine
  • The datasets created by the combine and reporting tasks.

With this lineage graph, you can determine the origin of your datasets and how the dataset changed. 

Conclusion

Now that you see what all benefits OpenLineage provides, keep using it to manage your data pipelines. In no time, you’ll be able to track your errors and process data changes as quickly as possible.

Do you really want to track your errors and process data changes manually? An Automated Data Pipeline helps you solve this issue and this is where Hevo comes into the picture.

Visit our Website to Explore Hevo

Hevo Data will automate your data transfer process, hence allowing you to focus on other aspects of your business like Analytics, Customer Management, etc. Hevo provides a wide range of sources – 150+ Data Sources (including 40+ Free Sources) – that connect with over 15+ Destinations and load them into a destination to analyze real-time data at transparent pricing and make Data Replication hassle-free.

Want to take Hevo for a ride? Sign Up for a 14-day free trial and simplify your Data Integration process. Do check out the pricing details to understand which plan fulfills all your business needs.

Share your thoughts on Airflow Lineage using OpenLineage in the comments section below.

No-code Automated Data Pipeline for your Data Warehouse