This article introduces you to Google BigQuery and Airflow BigQuery Operators. It also provides a comprehensive guide for a better understanding. This article makes it easy for you to understand the intrinsic use of Operators and also the perks of using these operators for managing your data.

Google BigQuery logo

Google BigQuery is one of the most promising Data Warehousing solutions in the market from Google. It works on the Shared-Nothing Massively Parallel Processing (MPP) architecture where nodes work independently and do not share the same memory or storage. Its serverless architecture decouples nodes and helps you scale your business according to your business requirements. Furthermore, the architecture of Google BigQuery allows you to process your data parallelly in different nodes. This makes processing your data more efficient and effective.

Introduction to Airflow BigQuery Operators

Airflow logo

Airflow is an open-source platform for programmatically scheduling, authoring, and monitoring your workflow. It has a multi-node architecture because it houses different components in different nodes. In the image below, you can see the webserver and scheduler are inside the master node, the database in the other node, and worker nodes in the executer.

Airflow Architecture image
Image Source

Airflow also provides various operators like Airflow BigQuery Operators, etc., that help in managing your data. Operators, in particular, are one of the widely used operators as they help in managing data to analyze and find extract meaningful insights. With Operators, you can perform the following tasks:

The later section of this article will walk you through these tasks one by one.

Manage Datasets

Airflow BigQuery Operator: Manage Datasets image

It is one of the most prominent uses of Airflow BigQuery Operators. It provides you the operators to easily manage your datasets and allows you to perform numerous smaller tasks like:

1) Create an Empty Dataset

You can create an empty dataset in a Google BigQuery database by using the “BigQueryCreateEmptyDatasetOperator” operator. Given below is the syntax of this operator:

create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)

Note: The code demands a name for your dataset. You need to make sure that the name is used before in that particular database. You cannot have 2 datasets with the exact name in a single database.

2) Fetch Details of an Existing Dataset

You can also fetch the details of an existing dataset using the “BigQueryGetDatasetOperator” operator. Given below is the syntax of this operator:

get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)

3) List Tables in a Dataset

This Airflow BigQuery Operator is used to fetch a list of tables from an existing dataset. You can use the “BigQueryGetDatasetTablesOperator” to retrieve the list. Given below is the syntax of this operator:

get_dataset_tables = BigQueryGetDatasetTablesOperator(task_id="get_dataset_tables", dataset_id=DATASET_NAME)

4) Update an Existing Dataset

You can use the “BigQueryUpdateDatasetOperator” operator to update an existing dataset in your Google BigQuery database. This operator replaces the entire dataset resource rather than replacing the specific fields. Given below is the syntax of this operator:

update_dataset = BigQueryUpdateDatasetOperator(task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"},)

5) Delete an Existing Dataset

For deleting an existing dataset, you can use the “BigQueryDeleteDatasetOperator” Airflow BigQuery Operator. Given below is the syntax for this operator:

delete_dataset = BigQueryDeleteDatasetOperator(task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True)

Manage Tables

Airflow BigQuery Operator: Manage Tables image
Image Source

Along with the datasets, you can also manage the tables with Airflow BigQuery Operators. It makes it easier for you to perform the following tasks to handle your tables:

1) Create an Internal Table

You can use the “BigQueryCreateEmptyTableOperator” operator to create an empty table in the specified dataset. It gives you the option to directly pass the schema of the table in the code or lead the operator to a Google Cloud Storage object name. The latter would allow Airflow BigQuery Operator to create a view on top of an existing table.
You can use the query given below to create a table (with schema fields):

create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
	{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
	{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)

You can use the query given below to lead the operator to an existing table schema:

create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
	"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`", "useLegacySql": False,
},
)

2) Create an External Table

Airflow BigQuery Operator also gives you the freedom to create an external table with the data in Google Cloud Storage. This can be done using the “BigQueryCreateExternalTableOperator” operator. Just like the internal table, it gives you the option to pass the schema fields or lead the operator to an existing object name in Google Cloud Storage. Use the query given below for the same:

create_external_table = BigQueryCreateExternalTableOperator(
    task_id="create_external_table",
    table_resource={
        "tableReference": {
            "projectId": PROJECT_ID,
            "datasetId": DATASET_NAME,
            "tableId": "external_table",
        },
        "schema": {
            "fields": [
                {"name": "name", "type": "STRING"},
                {"name": "post_abbr", "type": "STRING"},
            ]
        },
        "externalDataConfiguration": {
            "sourceFormat": "CSV",
            "compression": "NONE",
            "csvOptions": {"skipLeadingRows": 1},
        },
    },
    bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
    source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
)

3) Fetch Data from an Existing Table

There are two methods to fetch the data from an existing table using the “BigQueryGetDataOperator” operator. You can either fetch the complete data at once or data for selected columns by passing the fields to the “selected_fields” object.

get_data = BigQueryGetDataOperator(
    task_id="get_data",
    dataset_id=DATASET_NAME,
    table_id=TABLE_1,
    max_results=10,
    selected_fields="value,name",
    location=location,
)

Note: The above code will return the requested data as a Python list. Here, the number of elements will be equal to the number of rows retrieved.

4) Upsert a Table

Airflow BigQuery Operator gives you to option to upsert a table using “BigQueryUpsertTableOperator”. Given below is the syntax of this operator:

upsert_table = BigQueryUpsertTableOperator(
    task_id="upsert_table",
    dataset_id=DATASET_NAME,
    table_resource={
        "tableReference": {"tableId": "test_table_id"},
        "expirationTime": (int(time.time()) + 300) * 1000,
    },
)

The above code will upsert the table if it already exists in the dataset. In case, it does not, then it will create a new table inside the specified dataset.

5) Update a Table Schema

You can update the schema of an existing table using the “BigQueryUpdateTableSchemaOperator” operator. It will only update the schema fields passed inside the code and will keep everything else as it is. Given below is the syntax of this operator:

update_table_schema = BigQueryUpdateTableSchemaOperator(
    task_id="update_table_schema",
    dataset_id=DATASET_NAME,
    table_id="test_table",
    schema_fields_updates=[
        {"name": "emp_name", "description": "Name of employee"},
        {"name": "salary", "description": "Monthly salary in USD"},
    ],
)

6) Delete an Existing Table

You can delete an existing table in the dataset using the “BigQueryDeleteTableOperator” operator. The operator will throw an error if the specified table does not exist in the dataset. Given below is the syntax of this operator:

delete_table = BigQueryDeleteTableOperator(
    task_id="delete_table",number
    deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)

Execute BigQuery Jobs

Airflow BigQuery Operator: Execute BigQuery Jobs image
Image Source

The Google BigQuery Jobs API executes asynchronously to load, export, query, and copy data. Whenever you create a load, export, query, or copy job, Google BigQuery automatically schedules and runs the job for you. In simpler terms, Google BigQuery Jobs API helps you to easily access and control your data. Suppose you want to execute the query given below in a specific database of Google BigQuery.

INSERT_ROWS_QUERY = (
    f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
    f"(42, 'monthy python', '{INSERT_DATE}'), "
    f"(42, 'fishy fish', '{INSERT_DATE}');"
)

In such a case, you can use the “BigQueryInsertJobOperator” Airflow BigQuery Operator. It will execute the above query with proper query job configuration. Given below is the syntax of this operator:


insert_query_job = BigQueryInsertJobOperator(
    task_id="insert_query_job",
    configuration={
        "query": {
            "query": INSERT_ROWS_QUERY,
            "useLegacySql": False,
        }
    },
    location=location,
)

“BigQueryInsertJobOperator” also allows you to include specific files in the configuration using the keyword “include”. Given below is the syntax:

select_query_job = BigQueryInsertJobOperator(
    task_id="select_query_job",
    configuration={
        "query": {
            "query": "{% include 'example_bigquery_query.sql' %}",
            "useLegacySql": False,
        }
    },
    location=location,
)

Validate Data

Airflow BigQuery Operator: Validate Data image
Image Source

With Airflow BigQuery Operators, you can also validate your data and check whether or not the SQL query executed has returned the valid data. In this case, you can use the “BigQueryCheckOperator” operator. Given below is the syntax of this operator:

check_count = BigQueryCheckOperator(
    task_id="check_count",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    use_legacy_sql=False,
    location=location,
)

It evaluates each value returned by the SQL query executed. The operator throws an error if any of the values return False.

You can also compare the data returned by the SQL query to a specific value using the “BigQueryValueCheckOperator” operator. You will have to use the “pass_value” object to specify the value against which you want to compare the SQL query resultant. This value can either be a string or numeric. The operator will throw an error if the value does not match. Given below is the syntax of this operator:

check_value = BigQueryValueCheckOperator(
    task_id="check_value",
    sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
    pass_value=4,
    use_legacy_sql=False,
    location=location,
)

Advantages of using Airflow BigQuery Operators

Now that you have a good understanding of how to work with Airflow BigQuery Operators, let us walk through the advantages of using Operators to manage your dataset and database as a whole. Following are some of the most important advantages of Operators:

  • Easy To Use: They are very easy to use and at the same time are also very easy to understand. You must have noticed that all the operators are named in plain English. This makes all these operators very easy to interpret.
  • No Additional Installation is Required: Google BigQuery supports Airflow and does not demand you to install any additional software(s) or 3rd-party tool(s) to run your Operators on your database. In simpler terms, you do have any prerequisite tasks to complete for executing these operators.

Learn More About:

Airflow Redshift Operator

Conclusion

This article introduced you to Google BigQuery and Airflow BigQuery Operators. It provided you a comprehensive guide on how to use numerous Operators and the perks of using Airflow for the task over any other platform. Furthermore, if you want to transfer your data from multiple data sources to Google BigQuery or any other Data Warehouse of your choice, then you can explore Hevo Data.

Share your experience of working in the comments section below!

Karan Singh Pokhariya
Research Analyst, Hevo Data

Karan is a skilled Market Research Analyst at Hevo Data, specializing in data-driven initiatives and strategic planning. He excels in improving KPIs like website traffic and lead generation using tools such as Metabase and Semrush. With a background in computer software engineering, Karan delivers high customer value through insightful articles on data integration and optimization.