Well-established Cloud Data Warehouses offer scalability and manageability, and Cloud Data lakes offer better storage for all types of data formats including Unstructured Data. Data Lakehouses like Databricks is a Cloud platform that incorporates the functionalities of both these Cloud solutions and Airflow Databricks Integration becomes a must for efficient Workflow Management.

Databricks is a scalable Cloud Data Lakehousing solution with better metadata handling, high-performance query engine designs, and optimized access to numerous built-in Data Science and Machine Learning Tools. To efficiently manage, schedule, and run jobs with multiple tasks, you can utilize the Airflow Databricks Integration.

Using the robust integration, you can describe your workflow in a Python file and let Airflow handle the managing, scheduling, and execution of your Data Pipelines. In this article, you will learn to successfully set up Apache Airflow Databricks Integration for your business.

Prerequisites

  • An active Databricks account.
  • Knowledge of Python.

Steps to Set up Apache Airflow Databricks Integration

In the Airflow Databricks connection, each ETL Pipeline is represented as DAG where dependencies are encoded into the DAG by its edges i.e. the downstream task is only scheduled if the upstream task is completed successfully.

Each task in Airflow is termed as an instance of the “operator” class that is executed as small Python Scripts. In the example given below, spark_jar_task will only be triggered if the notebook_task is completed first.

notebook_task = DatabricksSubmitRunOperator(
    task_id='notebook_task',
    …)

spark_jar_task = DatabricksSubmitRunOperator(
    task_id='spark_jar_task',
    …)
notebook_task.set_downstream(spark_jar_task)

For setting up the Apache Airflow Databricks Integration, you can follow the 2 easy steps:

  • Configure the Airflow Databricks Connection
  • Creating a DAG

A) Configure the Airflow Databricks Connection

To begin setting up the Apache Airflow Databricks Integration, follow the simple steps given below:

  • Step 1: Open a terminal and run the following commands to start installing the Airflow and Databricks Integration. Here the 2.1.0 version of apache-airflow is being installed.
mkdir airflow
cd airflow
pipenv --python 3.8
pipenv shell
export AIRFLOW_HOME=$(pwd)
pipenv install apache-airflow==2.1.0
pipenv install apache-airflow-providers-databricks
mkdir dags
airflow db init
airflow users create --username admin --firstname <firstname> --lastname <lastname> --role Admin --email your@email.com
  • Step 2: Open your Databricks Web page. Navigate to User Settings and click on the Access Tokens Tab. 
Airflow Databricks - Databricks User Settings
Image Source
  • Step 3: Click on the Generate New Token button and save the token for later use.
  • Step 4: Go to your Airflow UI and click on the Admins option at the top and then click on the “Connections” option from the dropdown menu. A list of all your current connections will be displayed.
Airflow Databricks - Airflow Admin Option
Image Source
  • Step 5: By default, the  databricks_conn_id parameter is set to “databricks_default,” therefore for this example, the same value is used. Type in the URL of your Databricks Workspace in the Host field. In the Extra field, enter the token key saved earlier.
Airflow Databricks - Airflow Connection Parameter Settings
Image Source

B) Creating a DAG

Airflow has defined an operator named DatabricksSubmitRunOperator for a fluent Airflow Databricks Integration. This operator executes the Create and trigger a one-time run (POST /jobs/runs/submit) API request to submit the job specification and trigger a run.

You can also use the DatabricksRunNowOperator but it requires an existing Databricks job and uses the Trigger a new job run (POST /jobs/run-now) API request to trigger a run. In this example for simplicity, the DatabricksSubmitRunOperator is used.

For creating a DAG, you need:

  • To configure a cluster (Cluster version and Size).
  • Python script specifying the job.    

In this example, AWS keys are passed that are stored in an Airflow environment over into the ENVs for the DataBricks Cluster to access files from Amazon S3. After running the following code, your Airflow DAG will successfully call over into your DataBricks account and run a job based on a script you have stored in S3.

from airflow import DAG
from airflow.contrib.operators.databricks_operator import DatabricksSubmitRunOperator
from airflow.utils.dates import days_ago
from airflow.models import Variable

default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 0,
}

with DAG(
        dag_id='my_first_databricks_operator',
        default_args=default_args,
        schedule_interval=None,
        start_date=days_ago(1),
) as dag:
    my_cluster = {
        'spark_version': '8.2.x-scala2.12',
        'node_type_id': 'r3.xlarge',
        'aws_attributes': {'availability': 'ON_DEMAND'},
        'num_workers': 1,
        'spark_env_vars': {'AWS_SECRET_ACCESS_KEY': Variable.get("AWS_SECRET_ACCESS_KEY"),
                           'AWS_ACCESS_KEY_ID': Variable.get("AWS_ACCESS_KEY_ID"),
                           'AWS_REGION': 'us-east-1'}
    }

    task_params = {
        'new_cluster': my_cluster,
        'spark_python_task': {
            'python_file': 's3://some-wonderful-bucket/my_pyspark_script.py',
        },
    }

    first_databricks_job = 
        DatabricksSubmitRunOperator(task_id='my_first_job_job',
                                    json=task_params
                                    )

    first_databricks_job 

Conclusion

In this article, you have learned how to effectively set up your Integration. The effortless and fluid Airflow Databricks Integration leverages the optimized Spark engine offered by Databricks with the scheduling features of Airflow.

Airflow provides you with a powerful Workflow Engine to orchestrate your Data Pipelines. The standard Python Features empower you to write code for Dynamic Pipeline generation. Setting up the Integration allows you to access data via Databricks Runs Submit API to trigger the python scripts and start the computation on the Databricks platform.

Managing and Monitoring the jobs on Databricks become efficient and smooth using Airflow. However, as your business grows, massive amounts of data is generated at an exponential rate.

Effectively handling all this data across all the applications used across various departments in your business can be a time-consuming and resource-intensive task. You would require to devote a portion of your Engineering Bandwidth to Integrate, Clean, Transform and Load your data into a Data Warehouse or a destination of your choice for further Business analysis.

Share with us your experience of setting up Airflow Databricks Integration. Let us know in the comments section below!  

Sanchit Agarwal
Research Analyst, Hevo Data

Sanchit Agarwal is an Engineer turned Data Analyst with a passion for data, software architecture and AI. He leverages his diverse technical background and 2+ years of experience to write content. He has penned over 200 articles on data integration and infrastructures, driven by a desire to empower data practitioners with practical solutions for their everyday challenges.

No-code Data Pipeline for Databricks