Understanding Airflow BigQuery Operators: 4 Critical Aspects

on bigquery datasets, Data Warehouse, ETL, Google BigQuery, Tutorials • September 20th, 2021 • Write for Hevo

Airflow BigQuery Operators - Featured image

In an era where data is a priceless resource for any business, choosing the right Data Warehouse which can collect and store all the data from various operational sources becomes very crucial. A Data Warehouse is a central repository where data from multiple operational data sources is stored for easy access by your employees or customers. The data is extracted, transformed (i.e. cleaned and integrated), and then stored in the specified format. One of the best Cloud-based Data Warehouses for any business is Google BigQuery.

What makes Google BigQuery an amazing fit for any modern business is that its ability to manage data very efficiently and efficiently. It provides Airflow BigQuery Operators that make all the tasks super easy and efficient. These operators help you in managing and validate data inside your datasets and databases as a whole.

From an on-premises Data Warehouse where you need to maintain all hardware and software to a Cloud-based Data Warehouse where there is no need for any physical hardware, the technology has made immense growth in this field. A Cloud-based Data Warehouse has not just reduced the cost of Data Warehousing solutions for businesses but has also provided tools that are fast, highly scalable, and available on a pay-per-use basis. Therefore, it becomes very important for you to look for a Data Warehousing solution that suits your business model the best.

This article introduces you to Google BigQuery and Airflow BigQuery Operators. It also provides a comprehensive guide for a better understanding of Airflow BigQuery Operators. This article makes it easy for you to understand the intrinsic use of Airflow BigQuery Operators and also the perks of using these operators for managing your data.

Table of Contents

Introduction to Google BigQuery

Google BigQuery logo
Image Source

Google BigQuery is one of the most promising Data Warehousing solutions in the market from Google. It works on the Shared-Nothing Massively Parallel Processing (MPP) architecture where nodes work independently and do not share the same memory or storage. Its serverless architecture decouples nodes and helps you scale your business according to your business requirements. Furthermore, the architecture of Google BigQuery allows you to process your data parallelly in different nodes. This makes processing your data more efficient and effective.

Losing data is one of the most disastrous things that could happen to your company. According to a study, The Diffusion Group found that 72% of businesses suffering a major data loss go bankrupt within 2 years. Google BigQuery creates a backup of your data at the service level and allows you to recover your data at any point in time. It also supports Time Travel which allows you to revert the changes you have made in your data, within a restricted period of 7 days.

One of the best features of Google BigQuery is that it sets apart compute and storage which gives you the freedom to scale your resources as per your requirements. Moreover, it allows both Horizontal and Vertical Scaling to increase your business performance and data storage bandwidth. In Horizontal Scaling, you can increase the number of bodes within a single logical unit. While in Vertical Scaling, you can add more resources to the existing nodes like increasing the disk space assigned to a particular node, etc. Google BigQuery also provides various operators like Airflow BigQuery Operators, etc., that you can use to manage your data. To know more about Airflow BigQuery Operators, keep reading!

For more information on Google BigQuery, click here.

Introduction to Airflow BigQuery Operators

Airflow logo
Image Source

Airflow is an open-source platform for programmatically scheduling, authoring, and monitoring your workflow. It has a multi-node architecture because it houses different components in different nodes. In the image below, you can see the webserver and scheduler are inside the master node, the database in the other node, and worker nodes in the executer.

Airflow Architecture image
Image Source

Airflow also provides various operators like Airflow BigQuery Operators, etc., that help in managing your data. Airflow BigQuery Operators, in particular, are one of the widely used operators as they help in managing data to analyze and find extract meaningful insights. With Airflow BigQuery Operators, you can perform the following tasks:

The later section of this article will walk you through these tasks one by one.

Simplify ETL Process using Hevo’s No-code Data Pipeline

Hevo Data, a No-code Data Pipeline helps to Load Data from any data source such as Databases, SaaS applications, Cloud Storage, SDKs, and Streaming Services and simplifies the ETL process. It supports 150+ data sources and is a 3-step process by just selecting the data source, providing valid credentials, and choosing the destination. Hevo loads the data onto the desired Data Warehouse such as Google BigQuery, etc., enriches the data, and transforms it into an analysis-ready form without writing a single line of code.

Its completely automated pipeline offers data to be delivered in real-time without any loss from source to destination. Its fault-tolerant and scalable architecture ensure that the data is handled in a secure, consistent manner with zero data loss and supports different forms of data. The solutions provided are consistent and work with different Business Intelligence (BI) tools as well.

Get Started with Hevo for free

Check out why Hevo is the Best:

  • Secure: Hevo has a fault-tolerant architecture that ensures that the data is handled in a secure, consistent manner with zero data loss.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Minimal Learning: Hevo, with its simple and interactive UI, is extremely simple for new customers to work on and perform operations.
  • Hevo Is Built To Scale: As the number of sources and the volume of your data grows, Hevo scales horizontally, handling millions of records per minute with very little latency.
  • Incremental Data Load: Hevo allows the transfer of data that has been modified in real-time. This ensures efficient utilization of bandwidth on both ends.
  • Live Support: The Hevo team is available round the clock to extend exceptional support to its customers through chat, email, and support calls.
  • Live Monitoring: Hevo allows you to monitor the data flow and check where your data is at a particular point in time.
Sign up here for a 14-day Free Trial!

Airflow BigQuery Operator: Manage Datasets

Airflow BigQuery Operator: Manage Datasets image
Image Source

It is one of the most prominent uses of Airflow BigQuery Operators. It provides you the operators to easily manage your datasets and allows you to perform numerous smaller tasks like:

Download the Cheatsheet on How to Set Up High-performance ETL to BigQuery
Download the Cheatsheet on How to Set Up High-performance ETL to BigQuery
Download the Cheatsheet on How to Set Up High-performance ETL to BigQuery
Learn the best practices and considerations for setting up high-performance ETL to BigQuery

1) Create an Empty Dataset

You can create an empty dataset in a Google BigQuery database by using the “BigQueryCreateEmptyDatasetOperator” operator. Given below is the syntax of this operator:

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)

Note: The code demands a name for your dataset. You need to make sure that the name is used before in that particular database. You cannot have 2 datasets with the exact name in a single database.

2) Fetch Details of an Existing Dataset

You can also fetch the details of an existing dataset using the “BigQueryGetDatasetOperator” operator. Given below is the syntax of this operator:

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

3) List Tables in a Dataset

This Airflow BigQuery Operator is used to fetch a list of tables from an existing dataset. You can use the “BigQueryGetDatasetTablesOperator” to retrieve the list. Given below is the syntax of this operator:

get_dataset_tables = BigQueryGetDatasetTablesOperator(task_id="get_dataset_tables", dataset_id=DATASET_NAME)

4) Update an Existing Dataset

You can use the “BigQueryUpdateDatasetOperator” operator to update an existing dataset in your Google BigQuery database. This operator replaces the entire dataset resource rather than replacing the specific fields. Given below is the syntax of this operator:

update_dataset = BigQueryUpdateDatasetOperator(task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"},)

5) Delete an Existing Dataset

For deleting an existing dataset, you can use the “BigQueryDeleteDatasetOperator” Airflow BigQuery Operator. Given below is the syntax for this operator:

delete_dataset = BigQueryDeleteDatasetOperator(task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True)

Airflow BigQuery Operator: Manage Tables

Airflow BigQuery Operator: Manage Tables image
Image Source

Along with the datasets, you can also manage the tables with Airflow BigQuery Operators. It makes it easier for you to perform the following tasks to handle your tables:

1) Create an Internal Table

You can use the “BigQueryCreateEmptyTableOperator” operator to create an empty table in the specified dataset. It gives you the option to directly pass the schema of the table in the code or lead the operator to a Google Cloud Storage object name. The latter would allow Airflow BigQuery Operator to create a view on top of an existing table.
You can use the query given below to create a table (with schema fields):

create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
	{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
	{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)

You can use the query given below to lead the operator to an existing table schema:

create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
	"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`", "useLegacySql": False,
},
)

2) Create an External Table

Airflow BigQuery Operator also gives you the freedom to create an external table with the data in Google Cloud Storage. This can be done using the “BigQueryCreateExternalTableOperator” operator. Just like the internal table, it gives you the option to pass the schema fields or lead the operator to an existing object name in Google Cloud Storage. Use the query given below for the same:

create_external_table = BigQueryCreateExternalTableOperator(
    task_id="create_external_table",
    table_resource={
        "tableReference": {
            "projectId": PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": "external_table",
        },
        "schema": {
            "fields": [
                {"name": "name", "type": "STRING"},
                {"name": "post_abbr", "type": "STRING"},
            ]
        },
        "externalDataConfiguration": {
            "sourceFormat": "CSV",
            "compression": "NONE",
            "csvOptions": {"skipLeadingRows": 1},
        },
    },
    bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
    source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
)

3) Fetch Data from an Existing Table

There are two methods to fetch the data from an existing table using the “BigQueryGetDataOperator” operator. You can either fetch the complete data at once or data for selected columns by passing the fields to the “selected_fields” object.

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=location,
)

Note: The above code will return the requested data as a Python list. Here, the number of elements will be equal to the number of rows retrieved.

4) Upsert a Table

Airflow BigQuery Operator gives you to option to upsert a table using “BigQueryUpsertTableOperator”. Given below is the syntax of this operator:

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

The above code will upsert the table if it already exists in the dataset. In case, it does not, then it will create a new table inside the specified dataset.

5) Update a Table Schema

You can update the schema of an existing table using the “BigQueryUpdateTableSchemaOperator” operator. It will only update the schema fields passed inside the code and will keep everything else as it is. Given below is the syntax of this operator:

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

6) Delete an Existing Table

You can delete an existing table in the dataset using the “BigQueryDeleteTableOperator” operator. The operator will throw an error if the specified table does not exist in the dataset. Given below is the syntax of this operator:

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",number
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

Airflow BigQuery Operator: Execute BigQuery Jobs

Airflow BigQuery Operator: Execute BigQuery Jobs image
Image Source

The Google BigQuery Jobs API executes asynchronously to load, export, query, and copy data. Whenever you create a load, export, query, or copy job, Google BigQuery automatically schedules and runs the job for you. In simpler terms, Google BigQuery Jobs API helps you to easily access and control your data. Suppose you want to execute the query given below in a specific database of Google BigQuery.

INSERT_ROWS_QUERY = (
    f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
    f"(42, 'monthy python', '{INSERT_DATE}'), "
    f"(42, 'fishy fish', '{INSERT_DATE}');"
)

In such a case, you can use the “BigQueryInsertJobOperator” Airflow BigQuery Operator. It will execute the above query with proper query job configuration. Given below is the syntax of this operator:


insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
        }
    },
    location=location,
)

“BigQueryInsertJobOperator” also allows you to include specific files in the configuration using the keyword “include”. Given below is the syntax:

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include 'example_bigquery_query.sql' %}",
            "useLegacySql": False,
        }
    },
    location=location,
)

Airflow BigQuery Operator: Validate Data

Airflow BigQuery Operator: Validate Data image
Image Source

With Airflow BigQuery Operators, you can also validate your data and check whether or not the SQL query executed has returned the valid data. In this case, you can use the “BigQueryCheckOperator” operator. Given below is the syntax of this operator:

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    use_legacy_sql=False,
    location=location,
)

It evaluates each value returned by the SQL query executed. The operator throws an error if any of the values return False.

You can also compare the data returned by the SQL query to a specific value using the “BigQueryValueCheckOperator” operator. You will have to use the “pass_value” object to specify the value against which you want to compare the SQL query resultant. This value can either be a string or numeric. The operator will throw an error if the value does not match. Given below is the syntax of this operator:

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
    location=location,
)

Advantages of using Airflow BigQuery Operators

Now that you have a good understanding of how to work with Airflow BigQuery Operators, let us walk through the advantages of using Airflow BigQuery Operators to manage your dataset and database as a whole. Following are some of the most important advantages of Airflow BigQuery Operators:

  • Easy To Use: Airflow BigQuery Operators are very easy to use and at the same time are also very easy to understand. You must have noticed that all the operators are named in plain English. This makes all these operators very easy to interpret.
  • No Additional Installation is Required: Google BigQuery supports Airflow and does not demand you to install any additional software(s) or 3rd-party tool(s) to run your Airflow BigQuery Operators on your database. In simpler terms, you do have any prerequisite tasks to complete for executing these operators.

Conclusion

This article introduced you to Google BigQuery and Airflow BigQuery Operators. It provided you a comprehensive guide on how to use numerous Airflow BigQuery Operators and the perks of using Airflow for the task over any other platform. Furthermore, if you want to transfer your data from multiple data sources to Google BigQuery or any other Data Warehouse of your choice, then you can explore Hevo Data.

Visit our Website to Explore Hevo

Hevo Data is a No-code Data Pipeline and has awesome 150+ pre-built integrations that you can choose from. Hevo can help you integrate data from numerous data sources and load it into a destination such as Google BigQuery, Amazon Redshift, etc., to analyze real-time data with a BI tool and create your Dashboards. It will make your life easier and make data migration hassle-free. It is user-friendly, reliable, and secure.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You can also have a look at the unbeatable pricing that will help you choose the right plan for your business needs.

Share your experience of working with Airflow BigQuery Operators in the comments section below!

No-code Data Pipeline for Google BigQuery