Airflow is a Task Automation tool. It helps organizations to schedule their tasks so that they are executed when the right time comes. This relieves the employees from doing tasks repetitively.

When using Airflow, you will want to access it and perform some tasks from other tools. Furthermore, Apache Airflow is used to schedule and orchestrate data pipelines or workflows.

In this article, you will gain information about Airflow Redshift Operators. You will also gain a holistic understanding of Apache Airflow, Amazon Redshift, their key features, Amazon AWS Operators in Airflow, and the different Airflow Redshift Operators. Read along to find out in-depth information about Airflow Redshift Operators.

What is Airflow?

Airflow Redshift Operator: Airflow logo

Airflow is a platform that enables its users to automate scripts for performing tasks. It comes with a scheduler that executes tasks on an array of workers while following a set of defined dependencies.

Airflow also comes with rich command-line utilities that make it easy for its users to work with directed acyclic graphs (DAGs). The DAGs simplify the process of ordering and managing tasks for companies. 

Airflow also has a rich user interface that makes it easy to monitor progress, visualize pipelines running in production, and troubleshoot issues when necessary. 

Key Features of Airflow

  • Dynamic Integration: Airflow uses Python as the backend programming language to generate dynamic pipelines. Several operators, hooks, and connectors are available that create DAG and tie them to create workflows.
  • Extensible: Airflow is an open-source platform, and so it allows users to define their custom operators, executors, and hooks. You can also extend the libraries so that it fits the level of abstraction that suits your environment.
  • Elegant User Interface: Airflow uses Jinja templates to create pipelines, and hence the pipelines are lean and explicit. Parameterizing your scripts is a straightforward process in Airflow.
  • Scalable: Airflow is designed to scale up to infinity. You can define as many dependent workflows as you want. Airflow creates a message queue to orchestrate an arbitrary number of workers. 

Airflow can easily integrate with all the modern systems for orchestration. Some of these modern systems are as follows:

  1. Google Cloud Platform
  2. Amazon Web Services
  3. Microsoft Azure
  4. Apache Druid
  5. Snowflake
  6. Hadoop ecosystem
  7. Apache Spark
  8. PostgreSQL, SQL Server
  9. Google Drive
  10.  JIRA
  11. Slack
  12. Databricks

What is Amazon Redshift?

Spark s Redshift - Redshift Logo

Amazon Web Services (AWS) is a subsidiary of Amazon saddled with the responsibility of providing a cloud computing platform and APIs to individuals, corporations, and enterprises. AWS offers high computing power, efficient content delivery, database storage with increased flexibility, scalability, reliability, and relatively inexpensive cloud computing services.

Amazon Redshift, a part of AWS, is a Cloud-based Data Warehouse service designed by Amazon to handle large data and make it easy to discover new insights from them. Its operations enable you to query and combine exabytes of structured and semi-structured data across various Data Warehouses, Operational Databases, and Data Lakes.

Amazon Redshift is built on industry-standard SQL with functionalities to manage large datasets, support high-performance analysis, provide reports, and perform large-scaled database migrations.

Amazon Redshift also lets you save queried results to your S3 Data Lake using open formats like Apache Parquet from which additional analysis can be done on your data from other Amazon Web Services such as EMR, Athena, and SageMaker.

For further information on Amazon Redshift, you can follow the Official Documentation.

Key Features of Amazon Redshift

The key features of Amazon Redshift are as follows:

1) Massively Parallel Processing (MPP)

  • Massively Parallel Processing (MPP) is a distributed design approach in which the divide and conquer strategy is applied by several processors to large data jobs. A large processing job is broken down into smaller jobs which are then distributed among a cluster of Compute Nodes.
  • These Nodes perform their computations parallelly rather than sequentially. As a result, there is a considerable reduction in the amount of time Redshift requires to complete a single, massive job.

2) Fault Tolerance

  • Data Accessibility and Reliability are of paramount importance for any user of a database or a Data Warehouse. Amazon Redshift monitors its Clusters and Nodes around the clock.
  • When any Node or Cluster fails, Amazon Redshift automatically replicates all data to healthy Nodes or Clusters.

3) Redshift ML

Amazon Redshift houses a functionality called Redshift ML that gives data analysts and database developers the ability to create, train and deploy Amazon SageMaker models using SQL seamlessly.

4) Column-Oriented Design

  • Amazon Redshift is a Column-oriented Data Warehouse. This makes it a simple and cost-effective solution for businesses to analyze all their data using their existing Business Intelligence tools.
  • Amazon Redshift achieves optimum query performance and efficient storage by leveraging Massively Parallel Processing (MPP), Columnar Data Storage, along with efficient and targeted Data Compression Encoding schemes.

What are Amazon AWS Operators in Airflow?

  • Airflow can be deployed in AWS using services such as ECS/Fargate for running the scheduler and webserver processes, EFS/S3 for storage, and Amazon RDS for the Airflow metastore.
  • Airflow provides many AWS-specific hooks and operators that allow you to integrate with different services with the AWS cloud platform.

The different Amazon AWS Operators in Airflow are:

  • Amazon Athena Operator
  • AWS DataSync Operator
  • AWS Database Migration Service Operators
  • ECS Operator
  • Amazon Elastic Kubernetes Service (EKS) Operators
  • Amazon EMR Operators
  • Amazon EMR on EKS Operators
  • Amazon Glacier Operator
  • Google API To S3 Transfer
  • Imap Attachment To S3 Operator
  • Redshift cluster management operators
  • RedshiftSQLOperator
  • Amazon S3 Operators
  • S3 To Redshift Transfer Operator
  • Salesforce To S3 Operator
  • SQS Publish Operator
  • Amazon Transfer Operators

For further information on Apache Airflow Providers in integration with Amazon AWS, you can visit here.

What are the Airflow Redshift Operators?

The different Airflow Redshift Operators are as follows:

1) Redshift Cluster Management Operators

The Airflow Redshift Operators which come under the category of Redshift Cluster Management operators are as follows:

A) Resume a Redshift Cluster

The RedshiftResumeClusterOperator is the Airflow Redshift Operator that can be used to resume a ‘paused‘ AWS Redshift Cluster. The AWS CLI resume-cluster API is leveraged by this Operator.

B) Pause a Redshift Cluster

The RedshiftPauseClusterOperator is the Airflow Redshift Operator that can be used to pause an “available” AWS Redshift Cluster. The AWS CLI pause-cluster API is leveraged by this Operator.

2) RedshiftSQLOperator

The RedshiftSQLOperator is used to execute statements against an Amazon Redshift cluster. This Airflow Redshift Operator collaborates with RedshiftSQLHook to connect to Amazon Redshift.

A) Example

example_redshift.py

This example showcases the RedshiftSQLOperator in action.

I) Purpose

This is a simple example dag for executing statements against an Amazon Redshift cluster using RedshiftSQLOperator.

II) Create a table

A table named “fruit” is created in the following code given below.

Source file: airflow/providers/amazon/aws/example_dags/example_redshift.py

    setup__task_create_table = RedshiftSQLOperator(
        task_id='setup__create_table',
        sql="""
            CREATE TABLE IF NOT EXISTS fruit (
            fruit_id INTEGER,
            name VARCHAR NOT NULL,
            color VARCHAR NOT NULL
            );
        """,
    )
III) Insert data into a table

A few sample rows are inserted into the “fruit” table in the following code given below.

Source File: airflow/providers/amazon/aws/example_dags/example_redshift.py

    task_insert_data = RedshiftSQLOperator(
        task_id='task_insert_data',
        sql=[
            "INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');",
            "INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');",
            "INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');",
            "INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');",
            "INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');",
            "INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');",
        ],
    )
IV) Fetching records from a table

A new table named “more_fruit” is created from the existing “fruit” table.

Source File: airflow/providers/amazon/aws/example_dags/example_redshift.py

    task_get_all_table_data = RedshiftSQLOperator(
        task_id='task_get_all_table_data', sql="CREATE TABLE more_fruit AS SELECT * FROM fruit;"
    )
V) Passing Parameters into RedshiftSQLOperator

The parameters attribute of RedshiftSQLOperator allows you to pass parameters into SQL statements dynamically.

Source File: airflow/providers/amazon/aws/example_dags/example_redshift.py

task_get_with_filter = RedshiftSQLOperator(
        task_id='task_get_with_filter',
        sql="CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color = '{{ params.color }}';",
        params={'color': 'Red'},
    )

B) The complete RedshiftSQLOperator DAG

The overall DAG with everything combined is given below.

Source File: airflow/providers/amazon/aws/example_dags/example_redshift.py

from airflow import DAG
from airflow.providers.amazon.aws.operators.redshift_sql import RedshiftSQLOperator

with DAG(
    dag_id="redshift",
    start_date=datetime(2021, 1, 1),
    schedule_interval=None,
    catchup=False,
    tags=['example'],
) as dag:
    setup__task_create_table = RedshiftSQLOperator(
        task_id='setup__create_table',
        sql="""
            CREATE TABLE IF NOT EXISTS fruit (
            fruit_id INTEGER,
            name VARCHAR NOT NULL,
            color VARCHAR NOT NULL
            );
        """,
    )
    task_insert_data = RedshiftSQLOperator(
        task_id='task_insert_data',
        sql=[
            "INSERT INTO fruit VALUES ( 1, 'Banana', 'Yellow');",
            "INSERT INTO fruit VALUES ( 2, 'Apple', 'Red');",
            "INSERT INTO fruit VALUES ( 3, 'Lemon', 'Yellow');",
            "INSERT INTO fruit VALUES ( 4, 'Grape', 'Purple');",
            "INSERT INTO fruit VALUES ( 5, 'Pear', 'Green');",
            "INSERT INTO fruit VALUES ( 6, 'Strawberry', 'Red');",
        ],
    )
    task_get_all_table_data = RedshiftSQLOperator(
        task_id='task_get_all_table_data', sql="CREATE TABLE more_fruit AS SELECT * FROM fruit;"
    )
    task_get_with_filter = RedshiftSQLOperator(
        task_id='task_get_with_filter',
        sql="CREATE TABLE filtered_fruit AS SELECT * FROM fruit WHERE color = '{{ params.color }}';",
        params={'color': 'Red'},
    )

    setup__task_create_table >> task_insert_data >> task_get_all_table_data >> task_get_with_filter

3) S3ToRedshiftOperator

The S3ToRedshiftOperator transfers data from an S3 bucket to a Redshift table.

A) Example

example_s3_to_redshift.py

The example provided by dag showcases the S3ToRedshiftOperator inaction.

I) Purpose

This is a simple example dag that uses the S3ToRedshiftOperator to copy data from an S3 bucket into a Redshift table.

II) Environment variables

This example relies on the following variables, which can be passed via OS environment variables.

Source file: airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py

S3_BUCKET = getenv("S3_BUCKET", "test-bucket")
S3_KEY = getenv("S3_KEY", "key")
REDSHIFT_TABLE = getenv("REDSHIFT_TABLE", "test_table")

You also need to set the S3_BUCKET at least.

III) Copy S3 key into Redshift table

The following code given below copies the S3 key s3://{S3_BUCKET}/{S3_KEY}/{REDSHIFT_TABLE} into the Redshift table PUBLIC.{REDSHIFT_TABLE}.

Source file: airflow/providers/amazon/aws/example_dags/example_s3_to_redshift.py

    task_transfer_s3_to_redshift = S3ToRedshiftOperator(
        s3_bucket=S3_BUCKET,
        s3_key=S3_KEY,
        schema="PUBLIC",
        table=REDSHIFT_TABLE,
        copy_options=['csv'],
        task_id='transfer_s3_to_redshift',
    )

For further information about the Airflow Redshift Operator, S3ToRedshift Operator, you can visit here.

Conclusion

In this article, you have learned about Airflow Redshift Operators. This article also provided information on Apache Airflow, Amazon Redshift, their key features, Amazon AWS Operators in Airflow, and the different Airflow Redshift Operators in detail.

For further information on Airflow ETL, Airflow Databricks Integration, Airflow REST API, you can visit the following links. Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations with a few clicks.

Hevo Data with its strong integration with 150+ data sources (including 40+ Free Sources) allows you to not only export data from your desired data sources & load it to the destination of your choice but also transform & enrich your data to make it analysis-ready.

Hevo also allows integrating data from non-native sources using Hevo’s in-built Webhooks Connector. You can then focus on your key business needs and perform insightful analysis using BI tools. 

Want to take Hevo for a spin? Sign Up or a 14-day free trial and experience the feature-rich Hevo suite firsthand. Also checkout our unbeatable pricing to choose the best plan for your organization.

Share your experience of understanding Apache Airflow Redshift Operators in the comment section below! We would love to hear your thoughts.

Talha
Software Developer, Hevo Data

Talha is a Software Developer with over eight years of experience in the field. He is currently driving advancements in data integration at Hevo Data, where he has been instrumental in shaping a cutting-edge data integration platform for the past four years. Prior to this, he spent 4 years at Flipkart, where he played a key role in projects related to their data integration capabilities. Talha loves to explain complex information related to data engineering to his peers through writing. He has written many blogs related to data integration, data management aspects, and key challenges data practitioners face.

No-code Data Pipeline for Amazon Redshift