You have BigQuery as the Single Source of Truth (SSOT) for your organizations. And there are multiple data sources from which data comes in. We understand that, as data engineers, you’re looking to perform CDC replication into BigQuery to avoid data loss and maintain data freshness across a multitude of use cases. Doing so will increase your organization’s data modernization initiatives and business agility.
Well, if you’re looking to understand database replication to BigQuery using CDC, then you’re in the right spot! This article is here to help you answer all the questions about performing BigQuery CDC data replication in a matter of minutes. So, let’s dive in!
Table of Contents
What is the Generic Architecture of CDC Replication to BigQuery?
The following diagram shows a generic architecture for performing CDC data replication from numerous sources to BigQuery.
Image Source
In the above diagram, initial data ingest (initial data dump) refers to the data that is ingested into BigQuery from a data source at the beginning. The CDC processing layer is responsible for capturing any changes in the source dataset and thus reflecting it in BigQuery.
For each source data table, a main table and a delta table are generated in BigQuery.
The main table has all of the columns from the source table plus one column for the most recent change ID value. Consider the latest change ID value as the version ID of the entity specified by the record’s primary key. This ID can be used to find the most recent version of the main table.
The delta table has all of the columns from the source table, plus an operation type column (either update, insert, or delete) and the change ID value.
What is the Process Involved in Replicating Data into BigQuery Using CDC?
Following is the overall process to replicate data into BigQuery using CDC:
- First, the initial data dump from the source is extracted.
- Then, the extracted data can be optionally transformed and loaded into its corresponding main table. If the table lacks a column that can be used as a change ID, such as a last-updated timestamp, the change ID is set to the lowest possible value for the data type of that column. This allows subsequent processing to identify the main table records that have been modified since the first data dump.
- Now, the CDC capture process captures the rows that have changed after the initial data dump.
- If needed, the CDC processing layer can perform any additional data transformation.
- Finally, the data is loaded into the corresponding delta table in BigQuery. This is done through micro-batch loads or streaming inserts.
- The data is inserted into the corresponding delta table in BigQuery, using micro-batch loads or streaming inserts.
What are the Approaches to Performing BigQuery CDC Replication?
Two approaches can be used by data practitioners for performing BigQuery CDC replication. These are:
- Scheduled CDC connects to databases and captures INSERT/UPDATE/DELETE entries from database tables by connecting to such databases through JDBC/ODBC connections.
- Real-time CDC is accomplished by capturing audit events of database commits in transaction logs, also known as audit logs or bin logs, and then merging those events into the target relational system, i.e., BigQuery.
We have explained in detail about Scheduled CDC using the following 2 examples.
- Performing BigQuery CDC Replication using bq Command Line Tool
- Performing BigQuery CDC Replication using Stored Procedures
However, you can also visit here for in-depth information about performing real-time CDC data replication into BigQuery.
Performing BigQuery CDC Replication using bq Command Line Tool
Let’s dive into the steps for performing BigQuery CDC replication using bq commands and SQL statements.
Creating Project in BigQuery
- Go to Google Cloud Console and either select or create a project and enable billing for it.
- If you’re creating a project, BigQuery API is automatically enabled.
- If you’re selecting an existing project, you need to enable the BigQuery API.
- In the Google Cloud console, Click Activate Cloud Shell at the top of the Google Cloud console window. Then the Cloud Shell window will appear.
- To update your BigQuery configuration file, you need to open the ~/.bigqueryrc file in a text editor and add or update the following lines anywhere in the file:
[query]
--use_legacy_sql=false
[mk]
--use_legacy_sql=false
- Now, you need to clone the GitHub repository that contains the scripts for setting up the BigQuery environment. The command for this is as follows:
git clone https://github.com/GoogleCloudPlatform/bq-mirroring-cdc.git
- Now you need to create the dataset and the main and delta tables:
cd bq-mirroring-cdc/tutorial
chmod +x *.sh
./create-tables.sh
Setting up BigQuery data
To demonstrate BigQuery CDC replication, we’ll use a pair of main and delta tables that are populated with sample data.
The example data has an easy and straightforward data model: a web session with a mandatory system-generated session ID and an optional user name. The user name is null when the session begins. The user name is filled in after the user logs in.
- To load data into the main table from the BigQuery environment scripts, you must run the following command:
bq load cdc_tutorial.session_main init.csv
bq load <file_name> (here it’s a CSV file)
- You can run a query to get the main table contents. The query can be as follows:
bq query "select * from cdc_tutorial.session_main limit 1000"
bq query <” Write the query for extracting the contents from a table”>
The output looks like the following:
+-----+-----------+-----------+
| id | username | change_id |
+-----+-----------+-----------+
| 100 | NULL | 1 |
| 101 | Priya | 2 |
| 102 | Mani | 3 |
+-----+-----------+-----------+
- Next, you load the first batch of CDC changes into the delta table. To load the first batch of CDC changes to the delta table from the BigQuery environment scripts, you can run:
bq load cdc_tutorial.session_delta first-batch.csv
- To get the delta table contents, you can run:
bq query "select * from cdc_tutorial.session_delta limit 1000"
The output looks like the following:
+-----+-----------+----------+------------------+
| id | username | change_id| change_type |
+-----+-----------+----------+------------------+
| 100 | NULL | 4 | U |
| 101 | Priya | 5 | D |
| 103 | Mani | 6 | I |
| 104 | NULL | 7 | I |
+-----+-----------+----------+------------------+
Query the data
There are multiple approaches you can use to determine the overall state of the sessions. The approaches are:
- Immediate consistency approach: Queries reflect the present state of replicated data in this approach. Here, a query is required that links the main table and the delta table. This query selects the most recent row for each primary key.
- Cost-optimized approach: This approach executes faster and less expensive queries at the expense of some delay in data availability. And the data is integrated into the main table periodically.
- Hybrid approach: In this approach, you can either use immediate consistency approach or cost-optimized approach, depending on your requirements and budget, i.e., higher cost and almost immediate consistency or lower cost but potentially stale data.
Performing BigQuery CDC Replication using Stored Procedures
Let’s consider an example of performing BigQuery CDC replication using stored procedures. The steps involved are:
- In this case, we are considering our data source as Google Sheets. The original table on which we want to perform BigQuery CDC replication is the following:
Image Source
- The above table will be exposed as an external table in BigQuery.
- Now, go to BigQuery and create a data set in a BigQuery project. Here, you will create an external table pointing to the table in Google Sheets using its respective URL.
- Now, fill in the required credentials while creating a dataset and creating an external table.
Image Source
Image Source
Image Source
- Now, you can check if you can query the data in BigQuery as follows:
SELECT * FROM ‘cdcds.cdc_raw’LIMIT 1000;
Image Source
- Now, you need to create a final BigQuery CDC table in which you will merge the CDC records identified from the source i.e., Google Sheets using the following DDL statement.
create or replace table cdcds.cdc_final
as
select * ,
current_timestamp as created_dt,
current_timestamp as last_updated_dt,
‘Initial Load’ as last_updated_by,
from cdcds.cdc_raw;
- Now you need to verify if the final table is created correctly. And also, make sure that the initial data load is done from the source cdc_raw table into the cdc_final table as follows:
SELECT * FROM ‘cdcds.cdc_raw’LIMIT 1000;
Image Source
- Next, you need to create a Stored Procedure in BigQuery that will do the CDC by copying the code below. Then you can run the code in the BigQuery console. Then click on the “Run” button.
CREATE OR REPLACE procedure `cdcds.sp_cdc_organization`()
BEGIN
create temp table cdc_table
as
select *,
(CASE
WHEN mod_rec.org_nm is null THEN ‘Insert’
WHEN new_mod_rec.org_nm_new is null THEN ‘Delete’
WHEN mod_rec.org_nm = new_mod_rec.org_nm_new THEN ‘Update’
else ‘Undefined’
END) as Operation_flag
from
(
— — — — — Get Records from Target that are modified in Source — — — — — — — — — — —
select org_nm,ceo,cfo,cmo,cto
from
(
select org_nm,ceo,cfo,cmo,cto
from cdcds.cdc_final
except distinct
select org_nm,ceo,cfo,cmo,cto
from cdcds.cdc_raw
)
) as mod_rec
FULL JOIN
(
— — — — — — — — -Get Records from Source That are New/Modified Records in Source — — -
select org_nm as org_nm_new,ceo as ceo_new,cfo as cfo_new,cmo as cmo_new,cto as cto_new
from
(
select org_nm,ceo,cfo,cmo,cto
from cdcds.cdc_raw
except distinct
select org_nm,ceo,cfo,cmo,cto
from cdcds.cdc_final
)
) as new_mod_rec
ON new_mod_rec.org_nm_new = mod_rec.org_nm;
#Insert records
insert into cdcds.cdc_final
select org_nm_new as org_nm, ceo_new as ceo, cfo_new as cfo,
cmo_new as cmo, cto_new as cto,
current_timestamp as created_dt,
current_timestamp as last_updated_dt,
‘Stored Proc’ as last_updated_by
from cdc_table where operation_flag=’Insert’;
#Delete records
delete from cdcds.cdc_final
where org_nm
in (select org_nm from cdc_table where operation_flag=’Delete’);
#Update records
Update cdcds.cdc_final cdc_final
set ceo = ceo_new,
cfo = cfo_new,
cmo = cmo_new,
cto = cto_new,
last_updated_dt=current_timestamp,
last_updated_by = ‘Stored Proc’
from cdc_table tmp
where tmp.operation_flag = ‘Update’ and
cdc_final.org_nm = tmp.org_nm;
#Update audit_table
insert into cdcds.audit_tbl
select ‘cdc_final’ as tbl_nm,
‘cdc_raw’ as src_tbl_nm,
sum(if(operation_flag=’Insert’,1,0)) as rec_inserted,
sum(if(operation_flag=’Delete’,1,0)) as rec_deleted,
sum(if(operation_flag=’Update’,1,0)) as rec_updated,
current_timestamp as run_dt
from cdc_table;
#Delete Temp table
drop table cdc_table;
END;
- Now, you need to create an Audit table for recording and controlling the CDC operations that will happen in the final CDC table. You can do this by using the following DDL statement in the BigQuery console.
CREATE OR REPLACE table cdcds.audit_tbl (
tbl_nm string,
src_tbl_nm string,
rec_inserted integer,
rec_deleted integer,
rec_updated integer,
run_dt timestamp
)
- The schema of the audit table will look similar to the following image:
Image Source
- Now, you can modify the source cdc_raw table to insert one record. Then, you can modify a non-primary key column by making the following changes in the google sheet.
Image Source
- Now, you can run the stored procedure as follows:
Image Source
- Now, you need to verify that the results are merged in the cdc_final table. And also verify the number of records inserted and updated in the cdc_audit table.
Image Source
Image Source
- Now, you can modify the source cdc_raw table for deleting a record. You can even modify primary key columns as illustrated below:
Image Source
- Now, you can run the stored procedure as follows:
Image Source
- Now, you need to verify that the results are merged in the cdc_final table. Also, you need to verify the number of records inserted and updated in the cdc_audit table. In this case, 2 records must be deleted and 1 must be inserted. (1 actually deleted from source and 1 primary key column value modified, which will result in the old primary key record deleted and a new record inserted due to the new primary key).
Image Source
Image Source
Summing It Up
CDC is an advanced and efficient method of data replication as it reduces the time and resource costs of data warehousing solutions i.e., BigQuery in this case. This article talked about the CDC architecture in BigQuery. It also highlighted how you can perform BigQuery CDC replication efficiently.
We can follow the process provided in the article. But, what if you have data stored in multiple databases which you need to integrate, clean, standardize, and perform CDC replication to BigQuery? Now, you will have to request your engineering team to invest a major portion of their bandwidth into developing & maintaining custom CDC data connectors. Apart from doing their primary engineering goals, they need to be on their toes to watch out for any data leakage and fix it on priority. This eventually becomes a resource and time expensive task.
No worries! There are already cloud-based solutions for accommodating this problem. They can completely automate the data integration process without requiring you to write any code. For instance, you can hop onto a smooth ride with a No-Code ETL tool like Hevo Data and enjoy 150+ plug n play integrations.
Visit our Website to Explore Hevo
No need to go to your BigQuery data warehouse for post-load transformations. You can simply run complex SQL transformations from the comfort of Hevo’s interface and get your data in the final analysis-ready form.