Press "Enter" to skip to content

MongoDB to BigQuery ETL – Steps to Stream Data

mongodb to bigquery etlIf you have a habit of observing technology trends in data space, I’m sure you might have heard SQL vs NoSQL debate at least a couple of times. Around five years before it was at its peak while many NoSQL solutions like MongoDB was getting a lot of traction. Ironically adoption of SQL has increased more than anytime before, especially in analytics use cases. Many NoSQL systems like Cassandra and Elasticsearch are now adding SQL interfaces to interact with the data.

MongoDB is a popular NoSQL database which requires data to be modeled in JSON format. If your application’s data model has a natural fit to MongoDB recommended data model, it can provide good performance, flexibility, and scalability for transaction type of workloads. But while coming to analyzing data MongoDB is not a great solution – it doesn’t have proper join, getting data from other systems to MongoDB will be difficult and no native support for SQL. MongoDB’s aggregation framework is not that easy to draft complex analytics logic as in SQL.

Moving data from MongoDB to BigQuery, a data warehousing solution with proper SQL support makes thing a lot easy. Google BigQuery is the data warehouse solution from Google Cloud Platform and one of the most performant and cost-effective solutions.

Convert MongoDB to BigQuery

convert mongodb to bigquery etl

As shown in the diagram, the vital steps to be followed to stream MongoDB to BigQuery are:

  1. Extract data from using mongoexport utility as shown in the image above
  2. Optional Clean and Transform data
  3. Upload to Google Cloud Storage(GCS)
  4. Upload to the BigQuery table from GCS using bq command-line tool or console or any cloud SDK.

Let’s take a comprehensive look at all the required steps:

1. Extract Data From MongoDB

As mentioned above, mongoexport is the utility that can be used to create JSON or CSV export of data stored in a MongoDB collection.

A general guideline for using mongoexport :

  • mongoexport must be run directly from the system command line, not from mongo shell
  • Ensure that the connecting user possesses, at a minimum, the read role on the target database.
  • mongoexport defaults to primary read preference when connected to mongo or a replica set.
  • The default read preference can be overridden using the –readPreference option

Below is an example which exports data from cont_collect collection to a CSV file in the location /opt/exports/csv/cnts.csv

mongoexport --db users --collection cont_collect --type=csv --fields 
contact_name,contact_address --out /opt/exports/csv/cnts.csv
  • When you export in CSV format, you must specify the fields in the documents to export. In the above example, specifiy the contact_name and contact_address fields to export. The output would look like:
    contact_name, contact_address
    Elon Wick, 27 Manhattan
    John Musk, 911 Martian Town
  • Optionally you can specify the fields in a file containing the line-separated list of fields to export – with one field per line. For example, you can specify the contact_name and contact_address fields in a file contact_fields.txt:

    contact_name
    contact_address
  • Then, applying the –fieldFile option, define the fields to export with the file:
    mongoexport --db users --collection cont_collects --type=csv --fieldFile fcontact_ields.txt
    --out/opt/backups/contacts.cs
  • By default field, names are included as a header. –noHeaderLine option can be used for eliminating the field names in a CSV export.
  • As shown in the above example, –fields can be used to specify fields. Specified fields can be a field inside a sub-document.

Incremental Data Extract From MongoDB

Instead of extracting the entire collection, you can pass query and filter data. This option can be used to extract data incrementally.–query or -q is used to pass this option.

As an example, let’s consider above-discussed contacts collection. Suppose ‘last_updated_time’ field in each document stores last updated/inserted Unix timestamp for that document.

mongoexport -d users -c contacts -q '{ last_updated_time: { $gte: 1548930176 } }' 
--out exportdir/myRecords.json

Above command will extract all records from the collection having last_updated_time greater than the specified value, 1548930176. You should keep track of last pulled last_updated_time separately.

2. Transforming Data

Apart from transformations to accommodate business logic, there are some basic things to keep in mind while preparing data for BigQuery.

  • BigQuery expects UTF-8 encoded CSV data. If the data is encoded in ISO-8859-1(Latin-1), explicitly specify it while loading to BigQuery.
  • BigQuery doesn’t enforce Primary Key and unique key constraints. ETL process has to take care of that.
  • Column types are somewhat different between MongoDB and BigQuery. Most of the types have either equivalent or convertible types. Refer to the following list to know more about the common data types.
MongoDB Data Type BigQuery Data Type
DOUBLE FLOAT64
STRING STRING
BINARY DATA BYTES
OBJECTID STRING
BOOLEAN BOOL
DATE DATE
NULL NULL
32-BIT INTEGER INT64
TIMESTAMP TIMESTAMP
64-BIT INTEGER INT64
DECIMAL128 NUMERIC

Check out to Bigquery documentation to read on BigQuery data types.

  • DATE value must be a dash(-) separated and in the form YYYY-MM-DD (year-month-day). Fortunately, mongoexport date format is exactly like this and there is no need to do any modifications.
  • Make sure text columns are quoted if it can potentially have delimiter characters.

3. Upload to Google Cloud Storage (GCS)

Once data is extracted from MongoDB, the next step is to upload data to GCS. There are multiple ways to do this.

Gsutil is the standard tool from GCP to handle objects and buckets in GCS. It has options to upload a file from your local machine to GCS.

To copy a file to GCS:

gsutil cp table_data.csv  gs://my-bucket/path/to/folder/

To copy an entire folder:

gsutil cp -r dir gs://my-bucket/path/to/parent/

If the files to be copied is already in S3, the same command can be used.

gsutil cp -R s3://bucketname/source/path gs://bucketname/destination/path

Storage Transfer Service from Google cloud is an incredible alternative to upload files to GCS from S3 or other online data sources, provided destination or sink is Cloud Storage bucket. This service supports the GCS bucket as a source.

This service is handy when it comes to data movement to GCS with the following options:

  • Both first time and recurring loads can be scheduled.
  • After successfully transferring data to GCS, the source data can be deleted.
  • Periodic synchronization between source and target which can further be selected using filters.

Uploading From The Web Console

To upload data from the local machine, the GCP web console have inbuilt options. Follow the steps given below to make use of that

  1. Login to the GCP, click on Storage and go to Browser in the left bar

upload mongodb to google cloud platform

  1. Select GCS bucket with proper access where you want to upload the file which is to be loaded to BigQuery.

select gcs bucket upload to bigquery

       In the above image, test-data-hevo is the bucket name. Click on the bucket name.

  1. Now you are in the bucket details page. Click on the Upload files then select the file from your computer.mongodb bigquery connector etl
  2. Now data upload is started and a progress bar is shown, wait till it completes. After completion, the file will be listed in the bucket.upload mongodb files to bigquery

4. Upload to the BigQuery Table from GCS

You can use web console UI or command line tool called bq to stream data to BigQuery table. First, we can look into how to do it in the web console, step by step:

  1. Go to BigQuery console from the left side panel.gcp mongodb data to bigquery
  2. Create Dataset if not present already.stream mongodb to bigquery
  3. Now, click on the created datasets on the left side. Then create table option will appear below Query editor.create mongodb table to bigquery
  4. Create Table button will open the wizard with options to specify input source and other specifications for the table.create data mongodb bigquery connector

As shown in the screenshot, there are options to specify schema or auto-detect. Many other options are also available.

As mentioned above, the command line tool bq is a handy tool to interact with BigQuery which helps to upload data to the table from GCS. The bq load command can be used for this and the source_format CSV.

The syntax of bq load command is as follows:

bq --location=[LOCATION] load --source_format=[FORMAT_NAME]
[DATASET_NAME].[TABLE_NAME] [SOURCE_PATH] [INLINE_SCHEMA]
[LOCATION] is the data location
[FORMAT_NAME] here it is CSV.
[DATASET_NAME] is an existing dataset in BigQuery.
[TABLE_NAME] is the name of the table into which data to be loaded
[SOURCE_PATH] Cloud Storage URI of data files.
[INLINE_SCHEMA] is a valid schema to describe data. The schema can be a local JSON file or can be specified directly.
Note: --autodetect flag also can be used instead of supplying a schema definition.

There are a bunch of options specific to CSV data load :

CSV option bq command flag Description
Field Delimiter -F or –field_delimiter Field delimiter: Comma, Tab, Pipe, Other
Header rows –skip_leading_rows Header rows to skip
Number of bad records allowed –max_bad_records Number of errors allowed
Newline characters –allow_quoted_newlines Allow quoted newlines
Custom null values –null_marker Specifies a string that represents a null value in a CSV file
Trailing optional columns –allow_jagged_rows Allow jagged rows
Unknown values –ignore_unknown_values Ignore unknown values
Quote –quote The value that is used to quote data sections in a CSV file
Encoding -E or –encoding The character encoding of the data

To see full list options visit Bigquery documentation on loading data cloud storage CSV.

Below example, commands to load data using bq command line tool will help you familiarise with different options:

Specify data schema using a schema JSON file:

bq --location=US load --source_format=CSV  your_dataset_name.your_table_name 
gs://bucket_name/data_file.csv ./schema_file.json

Auto-detected schema from the file:

bq --location=US load --autodetect --source_format=CSV  your_dataset_name.your_table_name
gs://bucket_name/path/to/file/data_file.csv

If data is written to an existing table, BigQuery provides three options – Write if the target is empty, Append to the target table, Overwrite target table. Also, it is possible to add new fields to the table while uploading data. Let us see each with an example.

To overwrite target table:

bq --location=US load --autodetect --replace --source_format=CSV
your_dataset_name.your_table_name gs://bucket_name/path/to/file/file_name.csv

To append to the target table:

bq --location=US load --autodetect --noreplace --source_format=CSV
your_dataset_name.your_table_name gs://bucket_name/path/to/file/file_name.csv ./schema_file.json

If you would like to add a new field to the target table a new schema JSON file with the extra field can be given as an option :

bq --location=asia-northeast1 load --noreplace --schema_update_option=ALLOW_FIELD_ADDITION --source_format=CSV
your_dataset_name.your_table_name gs://bucket_name/path/to/file/file_name.csv ./schema_file.json

Update target table in BigQuery

To upsert new data to the BigQuery table initially load the data into a staging table as a full load (please refer the full data load section for details) Let’s call it as interm_delta_table. Below two approaches can be followed to load data to the final table :

  1. Update the rows in the final table and insert new rows from the delta table which are missing from the final table.
UPDATE data_setname.final_target_table_name t
SET t.value = s.value
FROM  data_set_name.interm_delta_table s
WHERE t.id = s.id;INSERT data_setname.final_target_table_name (id, value)
SELECT id, value
FROM  data_set_name.interm_delta_table
WHERE NOT id IN (SELECT id FROM data_setname.final_target_table_name);

2. Delete rows from the final table which are in the delta table. And then insert all records from the delta table to the final table.

DELETE data_setname.final_target_table_name f
WHERE f.id IN (SELECT id from data_set_name.interm_delta_table);
INSERT data_setname.final_target_table_name (id, value)
SELECT id, value
FROM  data_set_name.interm_delta_table;

Conclusion

Most of the time moving data from NoSQL system like MongoDB to a proper data warehouse is essential for proper data analysis. Converting schema to a normalized form and changing data format without any loss of information is really tricky. For keeping track of recurring incremental data loads, separate systems might be required for storing ETL job-related metadata. You can refer to our blog to know about the BigQuery best practices for high-performance data pipeline.

Easier Way to move data from MongoDB to BigQuery

There is a quicker option to deliver the same output and scale instantly. A Cloud Data Integration Platform – Hevo, can help you consolidate data from any data source into BigQuery in a matter of a few minutes. All of this can be achieved on a simple point and click interface without writing any ETL scripts. Here are the steps to replicate MongoDB to BigQuery using Hevo:

  • Connect to your MongoDB database.
  • Select the replication mode: (a) Full Dump and Load (b) Incremental load for append-only data (c) Incremental load for mutable data.
  • Configure the BigQuery destination where it should be copied.

Hevo offers a 7 Day Free Trial for you to explore a hassle-free data migration from MongoDB to BigQuery. 

ETL Data to Redshift, Bigquery, Snowflake

Move Data from any Source to Warehouse in Real-time

Sign up today to get $500 Free Credits to try Hevo!
Start Free Trial

Related Posts