Airflow Snowflake ETL Setup: 2 Easy Steps

on Data Integration, ETL, Tutorials • July 29th, 2020 • Write for Hevo

In this blog post, you will learn about Airflow, and how to use Airflow Snowflake combination for efficient ETL. Automation of pipelines in the data analytics field is an important task and a point of discussion in every architecture design as to which automation tool will suit the purpose. There are several processes associated with an ETL, and manual execution of these processes would be a cumbersome task to do. The pipeline can have several dependencies on internal and external factors such as the status of the previous job or the value of the environment variable that may affect the next set’s execution.

A fundamental example of a pipeline is online ordering. Consider that you’ve ordered a product/services online, so there will be an automated pipeline that continuously runs at the backend to:

  • Check the payment status.
  • Sends a notification to the seller to pack the product on a successful payment.
  • Alert the courier service to ship the order.
  • Notify the customer about their order status.
  • Update the order quantity at the backend and so on.

Let’s see how this blog is structured for you:

  1. Introduction to Airflow
  2. Introduction to Snowflake
  3. Prerequisites
  4. Steps for Airflow Snowflake ETL Setup
  5. Conclusion

Introduction to Airflow

Apache Airflow is an open-source workflow automation and scheduling platform that programmatically author, schedule, and monitor workflows. Organizations use Airflow to orchestrate complex computational workflows, create data processing pipelines, and perform ETL processes. Airflow uses DAG (Directed Acyclic Graph) to construct the workflow, and each DAG contains nodes and connectors. Nodes connect to other nodes via connectors to generate a dependency tree.

Key Features of Airflow

  • Dynamic Integration: Airflow uses Python as the backend programming language to generate dynamic pipelines. Several operators, hooks, and connectors are available that create DAG and ties them to create workflows.
  • Extensible: Airflow is an open-source platform, and so it allows users to define their custom operators, executors, and hooks. You can also extend the libraries so that it fits the level of abstraction that suits your environment.
  • Elegant User Interface: Airflow uses Jinja templates to create pipelines, and hence the pipelines are lean and explicit. Parameterizing your scripts is very easy and straightforward.
  • Scalable: Airflow is designed to scale up to infinity. You can define as many dependent workflows as you want. Airflow creates a message queue to orchestrate an arbitrary number of workers. 

Airflow can easily integrate with all the modern systems for orchestration. Some of them are as follows:

  1. Google Cloud Platform
  2. Amazon Web Services
  3. Microsoft Azure
  4. Apache Druid
  5. Snowflake
  6. Hadoop ecosystem
  7. Apache Spark
  8. PostgreSQL, SQL Server
  9. Google Drive
  10.  JIRA
  11. Slack
  12. Databricks

You can find the complete list here

Introduction to Snowflake

Snowflake is completely cloud-oriented and truly a SaaS (Software-as-a-Service) offering, that is used to load, transform, and report massive data volume. Snowflake uses AWS, Azure, or GCP to host its services and provides an intuitive user interface that allows customers to perform analytics. It uses SQL to query the data that runs on its virtual machines.

Key Features of Snowflake

  • Scalable: Snowflake provides a secure, scalable architecture that can spin up an unlimited number of independent virtual warehouses in a few minutes. You can execute several parallel jobs without worrying about performance and memory management.
  • Pay Per Use Model: Snowflake is available as pay per usage; i.e., you only have to pay for the time you use the services offered by Snowflake. You can shut down the warehouses once the execution gets completed, to save costs.
  • High Processing Speed: Each Virtual Warehouse in Snowflake is associated with the MPP cluster (Massive Parallel Processing) that performs the parallel execution of a job without degrading other clusters’ performance.
  • Separate Storage and Compute Layer: Snowflake uses different storage and compute layers that can scale up or down without affecting the other.
  • Disaster Recovery: In Snowflake, data is replicated three times (by default) across the availability zones and regions, hence providing a complete fail-safe and fault-tolerant system.

Hevo Data: Load your Data Conveniently in Snowflake

Hevo is a No-code Data Pipeline. It supports pre-built data integrations from 100+ data sources, including Snowflake, at a reasonable price.

Let’s discuss some key features of Hevo Data: 

  1. Fully Managed: Hevo Data is a fully managed service and is straightforward to set up. It has a minimal learning curve.
  2. Pre-Built Integrations: Hevo Data has various connectors incorporated with it, which can connect to multiple sources with ease. 
  3. Schema Management: Hevo Data automatically maps the source schema to perform analysis without worrying about the changing schema.
  4. Real-Time: Hevo Data works on the batch as well as real-time data transfer, so your data is always ready for analysis.
  5. Fault-Tolerant: Hevo Data can resume the ingestion from the point of failure if it occurs. 
  6. Advanced Monitoring: Advanced monitoring gives you a one-stop view to watch all the activity that occurs within pipelines.
  7. Live Support: With 24/7 support, Hevo provides customer-centric solutions to the business use case.

Give Hevo a try by signing up for a 14-day free trial today.

Prerequisites

  • Use Python 3.6 or later and working knowledge on Python.
  • Amazon AWS account with read/write access to buckets.
  • Snowflake account, with access to perform read and write.
  • Access to Apache Airflow 1.10 and later, with dependencies installed. To install Apache Airflow, you can have a look here.
  • Basic understanding of workflows and programming language.

Steps for Airflow Snowflake ETL Setup

Here is the outline that you’ll be covering while traversing ahead: 

  1. Connection to Snowflake
  2. Creation of DAG

1. Connection to Snowflake 

  1. To connect to Snowflake, you have to create a connection with the Airflow. On the Admin page of Apache Airflow, click on Connections, and on the dialog box, fill the details as shown below. (Assuming Snowflake uses AWS cloud as its cloud provider).

    Conn Id: <CONNECTION_ID>
    Conn Type: Snowflake
    Host: <YOUR_SNOWFLAKE_HOSTNAME>
    Schema: <YOUR_SNOWFLAKE_SCHEMA>
    Login: <YOUR_SNOWFLAKE_USERNAME>
    Password: <YOUR_SNOWFLAKE_PASSWORD>
    Extra: {
            “account”: <YOUR_SNOWFLAKE_ACCOUNT_NAME>,
            “warehouse”: <YOUR_SNOWFLAKE_WAREHOUSE_NAME>,
             “database”: <YOUR_SNOWFLAKE_DB_NAME>,
             “region”: <YOUR_SNOWFLAKE_HOSTED_REGION>
        }
Airflow Snowflake: Connection  Details

2. Creation of DAG

DAG stands for Directed Acyclic Graph, and it represents the collection of tasks that you want to run. Each task runs on different workers at different points of time. DAG contains several operators that perform the tasks on the worker, like PythonOperator to perform python tasks, BashOperator to perform Bash tasks, and so on.

To create a DAG that will perform operations on Snowflake, you’ll need to use Snowflake operator and Snowflake hooks provided by Airflow:

Snowflake Operators are used when you want to perform a task without expecting output. These operators can execute – create, insert, merge, update, delete, copy into, and truncate operations as output result is not required in such cases.

Snowflake Hook is used when you expect a result from a query. Hooks are mostly used with select queries as they extract Snowflake results and pass them to Python for further processing.

Let’s create a sample DAG to automate the tasks in Snowflake via Airflow:

  1. Organize Python imports by using the following code.
Airflow Snowflake: Organize Python imports
  1. Initializing a DAG object is very simple, as it requires DAG id and the default parameters with schedule interval. There are many optional parameters provided by Airflow for additional functionalities. You can refer to the complete list of parameters here.
Airflow Snowflake: Initialize DAG Object
  1. Create functions in Python to create tables, insert some records, and get row count from Snowflake:
Airflow Snowflake: Create python function
  1. Create DAG with SnowflakeOperator and PythonOperator to incorporate the functions created above.
Airflow Snowflake: Create DAG
  1. Now that you have created the task, you need to connect them with the use of a (>>) operator to create a pipeline.
Airflow Snowflake: Connect DAG to form pipeline

After completion of the above script, you need to upload the script into the Airflow home. After the refresh, the DAG will appear on the user interface and will look as shown:

Airflow Snowflake: DAG

Below is the complete example of the DAG:

import logging

import airflow
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from airflow.contrib.hooks.snowflake_hook import SnowflakeHook
from airflow.contrib.operators.snowflake_operator import SnowflakeOperator

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

args = {"owner": "Airflow", "start_date": airflow.utils.dates.days_ago(2)}

dag = DAG(
    dag_id="snowflake_automation", default_args=args, schedule_interval=None
)

snowflake_query = [
    """create table public.test_employee (id number, name string);""",
    """insert into public.test_employee values(1, “Sam”),(2, “Andy”),(3, “Gill”);""",
]


def get_row_count(**context):
    dwh_hook = SnowflakeHook(snowflake_conn_id="snowflake_conn")
    result = dwh_hook.get_first("select count(*) from public.test_employee")
    logging.info("Number of rows in `public.test_employee`  - %s", result[0])

with dag:
    create_insert = SnowflakeOperator(
        task_id="snowfalke_create",
        sql=snowflake_query ,
        snowflake_conn_id="snowflake_conn",
    )

    get_count = PythonOperator(task_id="get_count", python_callable=get_row_count)

create_insert >> get_count

In the above DAG, the Snowflake operator creates a table and inserts data into the table. The Snowflake hook is then used to query the table created by the operator and return the result to the Python operator, which logs the result to the console.

Conclusion

In this blog, you have learned about Airflow, Snowflake, and how to use Airflow Snowflake combination for efficient ETL. Airflow with Snowflake helps us in automating the data transfer by forming an automated ETL.
If you want to consolidate your data in Snowflake, you can use Hevo. Hevo is a No-code Data Pipeline. It supports pre-built data integrations from 100+ data sources. It offers data transformation without writing any custom codes.

Give Hevo a try by signing up for a 14-day free trial today.

Share your experience of using Airflow Snowflake ETL in the comment section below.

No-code Data Pipeline for Snowflake