This article introduces you to Google BigQuery and Airflow BigQuery Operators. It also provides a comprehensive guide for a better understanding. This article makes it easy for you to understand the intrinsic use of Operators and also the perks of using these operators for managing your data.
Introduction to Google BigQuery
Google BigQuery is one of the most promising Data Warehousing solutions in the market from Google. It works on the Shared-Nothing Massively Parallel Processing (MPP) architecture where nodes work independently and do not share the same memory or storage. Its serverless architecture decouples nodes and helps you scale your business according to your business requirements. Furthermore, the architecture of Google BigQuery allows you to process your data parallelly in different nodes. This makes processing your data more efficient and effective.
Hevo is a fully managed, no-code data pipeline platform that effortlessly integrates data from more than 150 sources into a data warehouse such as BigQuery. With its minimal learning curve, Hevo can be set up in just a few minutes. Its features include:
- Connectors: Hevo supports 150+ integrations to SaaS platforms, files, Databases, analytics, and BI tools. It supports various destinations, including Google BigQuery, Amazon Redshift, and Snowflake.
- Transformations: A simple Python-based drag-and-drop data transformation technique that allows you to transform your data for analysis.
- Real-Time Data Transfer: Hevo provides real-time data migration, so you can always have analysis-ready data.
- 24/7 Live Support: The Hevo team is available 24/7 to provide exceptional support through chat, email, and support calls.
Try Hevo today to experience seamless data transformation and migration.
Get Started with Hevo for Free
Introduction to Airflow BigQuery Operators
Airflow is an open-source platform for programmatically scheduling, authoring, and monitoring your workflow. It has a multi-node architecture because it houses different components in different nodes. In the image below, you can see the webserver and scheduler are inside the master node, the database in the other node, and worker nodes in the executer.
Airflow also provides various operators like Airflow BigQuery Operators, etc., that help in managing your data. Operators, in particular, are one of the widely used operators as they help in managing data to analyze and find extract meaningful insights. With Operators, you can perform the following tasks:
The later section of this article will walk you through these tasks one by one.
Manage Datasets
It is one of the most prominent uses of Airflow BigQuery Operators. It provides you the operators to easily manage your datasets and allows you to perform numerous smaller tasks like:
1) Create an Empty Dataset
You can create an empty dataset in a Google BigQuery database by using the “BigQueryCreateEmptyDatasetOperator” operator. Given below is the syntax of this operator:
create_dataset = BigQueryCreateEmptyDatasetOperator(task_id="create-dataset", dataset_id=DATASET_NAME)
Note: The code demands a name for your dataset. You need to make sure that the name is used before in that particular database. You cannot have 2 datasets with the exact name in a single database.
2) Fetch Details of an Existing Dataset
You can also fetch the details of an existing dataset using the “BigQueryGetDatasetOperator” operator. Given below is the syntax of this operator:
get_dataset = BigQueryGetDatasetOperator(task_id="get-dataset", dataset_id=DATASET_NAME)
3) List Tables in a Dataset
This Airflow BigQuery Operator is used to fetch a list of tables from an existing dataset. You can use the “BigQueryGetDatasetTablesOperator” to retrieve the list. Given below is the syntax of this operator:
get_dataset_tables = BigQueryGetDatasetTablesOperator(task_id="get_dataset_tables", dataset_id=DATASET_NAME)
4) Update an Existing Dataset
You can use the “BigQueryUpdateDatasetOperator” operator to update an existing dataset in your Google BigQuery database. This operator replaces the entire dataset resource rather than replacing the specific fields. Given below is the syntax of this operator:
update_dataset = BigQueryUpdateDatasetOperator(task_id="update_dataset", dataset_id=DATASET_NAME, dataset_resource={"description": "Updated dataset"},)
5) Delete an Existing Dataset
For deleting an existing dataset, you can use the “BigQueryDeleteDatasetOperator” Airflow BigQuery Operator. Given below is the syntax for this operator:
delete_dataset = BigQueryDeleteDatasetOperator(task_id="delete_dataset", dataset_id=DATASET_NAME, delete_contents=True)
Integrate BigQuery to Snowflake
Integrate HubSpot to BigQuery
Integrate Mailchimp to BigQuery
Manage Tables
Along with the datasets, you can also manage the tables with Airflow BigQuery Operators. It makes it easier for you to perform the following tasks to handle your tables:
1) Create an Internal Table
You can use the “BigQueryCreateEmptyTableOperator” operator to create an empty table in the specified dataset. It gives you the option to directly pass the schema of the table in the code or lead the operator to a Google Cloud Storage object name. The latter would allow Airflow BigQuery Operator to create a view on top of an existing table.
You can use the query given below to create a table (with schema fields):
create_table = BigQueryCreateEmptyTableOperator(
task_id="create_table",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields=[
{"name": "emp_name", "type": "STRING", "mode": "REQUIRED"},
{"name": "salary", "type": "INTEGER", "mode": "NULLABLE"},
],
)
You can use the query given below to lead the operator to an existing table schema:
create_view = BigQueryCreateEmptyTableOperator(
task_id="create_view",
dataset_id=DATASET_NAME,
table_id="test_view",
view={
"query": f"SELECT * FROM `{PROJECT_ID}.{DATASET_NAME}.test_table`", "useLegacySql": False,
},
)
2) Create an External Table
Airflow BigQuery Operator also gives you the freedom to create an external table with the data in Google Cloud Storage. This can be done using the “BigQueryCreateExternalTableOperator” operator. Just like the internal table, it gives you the option to pass the schema fields or lead the operator to an existing object name in Google Cloud Storage. Use the query given below for the same:
create_external_table = BigQueryCreateExternalTableOperator(
task_id="create_external_table",
table_resource={
"tableReference": {
"projectId": PROJECT_ID,
"datasetId": DATASET_NAME,
"tableId": "external_table",
},
"schema": {
"fields": [
{"name": "name", "type": "STRING"},
{"name": "post_abbr", "type": "STRING"},
]
},
"externalDataConfiguration": {
"sourceFormat": "CSV",
"compression": "NONE",
"csvOptions": {"skipLeadingRows": 1},
},
},
bucket=DATA_SAMPLE_GCS_BUCKET_NAME,
source_objects=[DATA_SAMPLE_GCS_OBJECT_NAME],
)
3) Fetch Data from an Existing Table
There are two methods to fetch the data from an existing table using the “BigQueryGetDataOperator” operator. You can either fetch the complete data at once or data for selected columns by passing the fields to the “selected_fields” object.
get_data = BigQueryGetDataOperator(
task_id="get_data",
dataset_id=DATASET_NAME,
table_id=TABLE_1,
max_results=10,
selected_fields="value,name",
location=location,
)
Note: The above code will return the requested data as a Python list. Here, the number of elements will be equal to the number of rows retrieved.
4) Upsert a Table
Airflow BigQuery Operator gives you to option to upsert a table using “BigQueryUpsertTableOperator”. Given below is the syntax of this operator:
upsert_table = BigQueryUpsertTableOperator(
task_id="upsert_table",
dataset_id=DATASET_NAME,
table_resource={
"tableReference": {"tableId": "test_table_id"},
"expirationTime": (int(time.time()) + 300) * 1000,
},
)
The above code will upsert the table if it already exists in the dataset. In case, it does not, then it will create a new table inside the specified dataset.
5) Update a Table Schema
You can update the schema of an existing table using the “BigQueryUpdateTableSchemaOperator” operator. It will only update the schema fields passed inside the code and will keep everything else as it is. Given below is the syntax of this operator:
update_table_schema = BigQueryUpdateTableSchemaOperator(
task_id="update_table_schema",
dataset_id=DATASET_NAME,
table_id="test_table",
schema_fields_updates=[
{"name": "emp_name", "description": "Name of employee"},
{"name": "salary", "description": "Monthly salary in USD"},
],
)
6) Delete an Existing Table
You can delete an existing table in the dataset using the “BigQueryDeleteTableOperator” operator. The operator will throw an error if the specified table does not exist in the dataset. Given below is the syntax of this operator:
delete_table = BigQueryDeleteTableOperator(
task_id="delete_table",number
deletion_dataset_table=f"{PROJECT_ID}.{DATASET_NAME}.test_table",
)
Execute BigQuery Jobs
The Google BigQuery Jobs API executes asynchronously to load, export, query, and copy data. Whenever you create a load, export, query, or copy job, Google BigQuery automatically schedules and runs the job for you. In simpler terms, Google BigQuery Jobs API helps you to easily access and control your data. Suppose you want to execute the query given below in a specific database of Google BigQuery.
INSERT_ROWS_QUERY = (
f"INSERT {DATASET_NAME}.{TABLE_1} VALUES "
f"(42, 'monthy python', '{INSERT_DATE}'), "
f"(42, 'fishy fish', '{INSERT_DATE}');"
)
In such a case, you can use the “BigQuery Insert Job Operator” Airflow BigQuery Operator. It will execute the above query with proper query job configuration. Given below is the syntax of this operator:
insert_query_job = BigQueryInsertJobOperator(
task_id="insert_query_job",
configuration={
"query": {
"query": INSERT_ROWS_QUERY,
"useLegacySql": False,
}
},
location=location,
)
“BigQueryInsertJobOperator” also allows you to include specific files in the configuration using the keyword “include”. Given below is the syntax:
select_query_job = BigQueryInsertJobOperator(
task_id="select_query_job",
configuration={
"query": {
"query": "{% include 'example_bigquery_query.sql' %}",
"useLegacySql": False,
}
},
location=location,
)
Validate Data
With Airflow BigQuery Operators, you can also validate your data and check whether or not the SQL query executed has returned the valid data. In this case, you can use the “BigQueryCheckOperator” operator. Given below is the syntax of this operator:
check_count = BigQueryCheckOperator(
task_id="check_count",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
use_legacy_sql=False,
location=location,
)
It evaluates each value returned by the SQL query executed. The operator throws an error if any of the values return False.
You can also compare the data returned by the SQL query to a specific value using the “BigQueryValueCheckOperator” operator. You will have to use the “pass_value” object to specify the value against which you want to compare the SQL query resultant. This value can either be a string or numeric. The operator will throw an error if the value does not match. Given below is the syntax of this operator:
check_value = BigQueryValueCheckOperator(
task_id="check_value",
sql=f"SELECT COUNT(*) FROM {DATASET_NAME}.{TABLE_1}",
pass_value=4,
use_legacy_sql=False,
location=location,
)
Common Use Cases for Airflow BigQuery Operators
These use cases illustrate how BigQuery Operator Airflow simplifies data orchestration and management tasks-
- Automated ETL Workflows: Airflow BigQuery Operators can streamline the Extract, Transform, Load (ETL) process, enabling automated data ingestion and transformation in BigQuery.
- Data Warehousing: Use Airflow to schedule and manage data imports, ensuring your BigQuery warehouse stays updated with minimal manual effort.
- Complex Query Execution: Automate the execution of complex SQL queries on BigQuery, allowing for consistent and error-free processing of large datasets.
- Analytics Pipeline Management: Build and manage analytics pipelines that pull data from multiple sources, process it, and store results in BigQuery for visualization or reporting.
- Data Archiving: Set up workflows to archive older data from BigQuery to cloud storage, optimizing costs and performance.
Advantages of using Airflow BigQuery Operators
Now that you have a good understanding of how to work with Airflow BigQuery Operators, let us walk through the advantages of using Operators to manage your dataset and database as a whole. Following are some of the most important advantages of Operators:
- Easy To Use: They are very easy to use and at the same time are also very easy to understand. You must have noticed that all the operators are named in plain English. This makes all these operators very easy to interpret.
- No Additional Installation is Required: Google BigQuery supports Airflow and does not demand you to install any additional software(s) or 3rd-party tool(s) to run your Operators on your database. In simpler terms, you do have any prerequisite tasks to complete for executing these operators.
Migrate Data Seamlessly to BigQuery with Hevo!
No credit card required
Learn More About:
Airflow Redshift Operator
Conclusion
This article provided an in-depth overview of Google BigQuery and Airflow BigQuery Operators, covering their functionality and the benefits of using Airflow for streamlined data workflows. With its robust scheduling and orchestration features, Airflow simplifies managing complex data pipelines.
However, when it comes to transferring data from diverse sources to Google BigQuery or any other Data Warehouse, tools like Hevo Data offer unparalleled ease of use, automation, and scalability. Explore Hevo to ensure a seamless data integration experience without the hassle of manual intervention. Sign up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.
FAQs
1. What is an operator in Airflow?
An operator in Airflow defines a single task in a workflow, such as executing a Python script, running a SQL query, or transferring data. Operators are building blocks for Airflow DAGs.
2. What is the difference between operator and hook in Airflow?
An operator performs a specific task, while a hook provides the interface to interact with external systems (e.g., databases or APIs). Operators often use hooks internally for these interactions.
3. What is the difference between BigQuery and Airflow?
BigQuery is a data warehouse for storing and analyzing data, while Airflow is a workflow orchestration tool for scheduling and automating tasks, such as running queries in BigQuery.
Karan is a skilled Market Research Analyst at Hevo Data, specializing in data-driven initiatives and strategic planning. He excels in improving KPIs like website traffic and lead generation using tools such as Metabase and Semrush. With a background in computer software engineering, Karan delivers high customer value through insightful articles on data integration and optimization.