Building GCP Data Pipeline Made Easy

Sharon Rithika • Last Modified: March 22nd, 2023

GCP Data Pipeline FI

Data Analytics is a developing field driven forward by cloud-based advances. Organizations now have additional data access thanks to the ability to crunch data even faster. Google Cloud provides a comprehensive collection of services for processing data more quickly and efficiently.

Once you’ve structured your data into a system or more than one system, as the case maybe you’ll need a mechanism to transfer data between them. This can involve a number of stages, including transferring data, moving it to the cloud, reformatting it, and merging it with data from other sources. A Data Pipeline is made up of all of these processes.

This article covers topics related to Google Cloud Platform and steps to build GCP Data Pipeline.

Table of Contents

What is Google Cloud Platform?

GCP, like Amazon Web Services (AWS) and Microsoft Azure, is a public cloud provider. Customers can use computer resources located in Google’s data centers across the world for free or on a pay-per-use basis through GCP and other cloud partners. GCP made its debut in 2008 with the release of App Engine, a product. Google released a preview version of App Engine in April 2008, a developer tool that allowed clients to run their web applications on Google Infrastructure. 

Google Cloud Platform is now one of the world’s leading public cloud providers. Nintendo, eBay, UPS, The Home Depot, Etsy, PayPal, 20th Century Fox, and Twitter are among the Google Cloud customers. GCP provides a range of computing services, including GCP cost management, data management, web and video delivery over the web, and AI and machine learning tools.

Key Features of GCP

Here are a few features that make GCP stand out from the rest:

  • Security: The Google Security methodology is based on more than 15 years of expertise in keeping Google users safe while using their services. Google Cloud Platform enables your programs and data to run on the same secure network that Google has designed for itself.
  • Pricing: GCP, like most cloud providers, offers a monthly pay-as-you-go option. This means that your bill depends on how much time you spend using the compute engine instances. Google, on the other hand, goes a step farther and charges per second with a one-minute minimum charge. When your company isn’t using Compute Engine, this helps you to save even more money, especially if you’re running short-term workloads or a dynamic web application.
  • Big Data: Google additionally stands apart due to its BigQuery based Data Analytics. You may process data in the cloud with Big Data services to acquire answers to your most complicated questions. In addition, you may construct schemas, load data, generate queries, and export data.
  • Services provided by GCP: There are a number of elements that influence your decision to use Google Cloud Platform as your public cloud provider. Its services and distributed application paradigm are one of the main reasons. This is for companies that want to construct new cloud applications or replace part of their existing data center applications with cloud-based apps. To put it another way, transitioning to the cloud and a services model is a process, not a destination. GCP gives you the power to build apps and combine assets, often known as app modernization, with an emphasis on services and applications. You may stage your workloads more efficiently by leveraging Google’s experience as a technology pioneer by constructing a hybridized model with components.

What is a Data Pipeline?

A Data Pipeline is a set of stages for processing data. The data is ingested at the start of the pipeline if it has not yet been placed into the data platform. Then there’s a sequence of steps, each of which produces an output that becomes the input for the following phase. This will go on till the pipeline is finished. Independent steps may be conducted simultaneously in some instances.

A source, a processing step or steps, and a destination are the three main components of a Data Pipeline. The destination in a certain Data Pipeline is referred to as a Sink. For example, Data Pipelines allow data to move from an application to a Data Warehouse, from a Data Lake to an analytics database, or from a payment processing system to a payment processing system. Data Pipelines can also have the same source and sink, allowing the pipeline to focus solely on data modification. There is a Data Pipeline between points A and B (or points B, C, and D) whenever data is processed between those points.

Need for Data Pipeline 

Data Pipelines are used for a variety of reasons. You can utilize a Data Pipeline to integrate two sets of databases if you’re in retail and selling a product. You can integrate one store’s purchase information, and the other’s inventory information. You can use a Data Pipeline to integrate the two to see when your stock is running low. Another example is an energy corporation with market data that needs to be handled, as shown below. To streamline results, a Data Pipeline can handle price and use.

Explore These Methods to Build GCP Data Pipeline 

Apache Kafka has proven abilities to manage high data volumes, fault tolerance, and durability. Also, BigQuery is a data warehouse known for ingesting data instantaneously and performing almost real-time analysis. When integrated together, moving data from Kafka to Bigquery could solve some of the biggest data problems for businesses. In this article, you have described two methods to achieve this:

Method 1: Building a GCP Data Pipeline By Eliminating the need for code using Hevo

Hevo Data, a Fully-managed Data Pipeline solution, can help you automate, simplify & enrich your Data Pipeline process in a few clicks. With Hevo’s out-of-the-box connectors and blazing-fast Data Pipelines, you can extract & aggregate data from 150+ Sources (including 50+ free sources) straight into your Data Warehouse, Database, or any destination. To further streamline and prepare your data for analysis, you can process and enrich Raw Granular Data using Hevo’s robust & built-in Transformation Layer without writing a single line of code!”

GET STARTED WITH HEVO FOR FREE

Method 2: Building a GCP Data Pipeline Manually 

This method would be time-consuming and somewhat tedious to implement. Users will have to write custom codes. This method is suitable for users with a technical background.

Both methods are explained below.

 Methods to Build GCP Data Pipeline

There are two methods to build a GCP Data Pipeline:

Method 1: Using Hevo

GCP Data Pipeline: Hevo Logo
Image Source

Hevo Data is a bi-directional data pipeline system designed for current ETL, ELT, and reverse ETL needs and allows data to be moved quickly and reliably from hundreds of independent sources to centralized storage and back to the required applications.

Hevo data simplifies your ETL process by providing you with pre-built connections with 150+ data sources, removing the need for your team to develop and maintain them. As a result, instead of designing and managing connections, you can concentrate on your product. 

Method 2: Building GCP Data Pipeline 

Google Cloud Platform is a collection of cloud computing services that combines compute, data storage, data analytics, and machine learning capabilities to help businesses establish Data Pipelines, secure data workloads, and perform analytics. In this article, you’ll walk through a simple demonstration of how to establish a GCP Data Pipeline. Let’s pretend to be a hypothetical company called LRC, Inc. that sells various items for this use case.

Requirements for GCP Data Pipeline 

Suppose your sales department would like to give a CSV file with recent sales information. The data will be utilized to track sales and provide aggregated data to an upstream system, and the file will come from the sales management system. A client name, a product id, an item count, and an item cost will all be included in the file. Data should be saved over time and new data should be added to it.

A new file should be created as part of the procedure that contains all order data aggregated by client and product. The new file will be saved in a shared area where it may be accessed by any team at any time. When the file is provided, an automated email should be sent to a certain address (otherteam@lrc.com) that will initiate a ticket alerting the other team that a file is ready. 

Design of GCP Data Pipeline 

Designing your workflow is quite straightforward. There are several stages to take:

  • Keep an eye out for a file.
  • Loading the data from a file into a database.
  • Using the data, create an aggregation.
  • Make a new file.
  • Sending an email.

Because your hypothetical company uses GCP, you’ll employ GCP services for this GCP Data Pipeline. Even if you limit yourselves to GCP, there are a plethora of solutions to fulfill these needs. You want to keep this as serverless as feasible and write as minimal code as possible as part of your technical needs. The files will be stored on Google Cloud Storage (GCS). Your database will be Google’s BigQuery large data distributed SQL database, and you’ll use Google Cloud Composer’s managed Airflow.

You’ll also use a Google Cloud Function (GCF) to keep an eye out for incoming files. You could also utilize GCF to send the email at the end, but you’ll do so in your composer orchestration to keep things as simple as possible.

Implementation of GCP Data Pipeline 

Most commands and services in GCP have a really nice-looking user interface. It is fairly clean and follows Google’s simple design. However, for most GCP Data Pipeline development jobs, you should use the command line to construct a repeatable and automated deployment procedure. The user interface is ideal for exploring and analyzing data.

The rule for everything you’ll need to automate is to use scripting. Your GCP Data Pipeline would be built in a development environment first, then tested, and ultimately deployed to a production environment. As a result, we’ll largely use the command-line interface (CLI).

GCP Data Pipeline: Google Cloud Storage (GCS)

In GCP Data Pipeline, the initial step is to build a few buckets. You’ll use the LRC, Inc bucket name conventions. For this, you’ll use the name of your organization, followed by the environment, and then a nice name.

GCS bucket names are one-of-a-kind globally. You’ll need to pick names for your buckets and remember them later on.

You can call your incoming file lrc-dev-incoming-sales-data. Later on, this will be referred to as “the incoming bucket.”

gsutil mb -l us-east1 gs://lrc-dev-incoming-sales-data/

You can call your outgoing file lrc-dev-outgoing-sales-aggregate. This will be referred to as “the departing bucket” later.

gsutil mb -l us-east1 gs://lrc-dev-outgoing-sales-aggregate/

You can make a list of them to be sure they were made:

gsutil ls gs://lrc-dev-outgoing-sales-aggregate/
gsutil ls gs://lrc-dev-incoming-sales-data/
GCP Data Pipeline: BigQuery using BQ CLI Command

The second step in GCP Data Pipeline is constructing a table in BigQuery to house your data, and you’ll do so with the BQ CLI command. Google BigQuery allows you to make a table in a variety of ways. If you’re comfortable with JSON, you can specify the columns and data types with a JSON doc. If you come from a database background, you’ll probably want to construct the table using SQL DDL. We are  going to use DDL.

You may potentially put off constructing the table until later in the Composer phase that you’ll use to load the data. If the table does not exist, this command will create it. However, you’d want to do so as part of the deployment, therefore you’ll use BQ for that. To demonstrate how it works, we let Composer construct the final aggregate table. The BQ command is as follows:

bq query –nouse_legacy_sql ” 
CREATE TABLE sales_data.current_sales (
     customer_name string,
     product_id string,
     item_count int64,
     item_cost float64
)”
GCP Data Pipeline: Using GCF

The third step in GCP Data Pipeline is to use Python for the GCF aspect of this, the function that will watch for a file arrival, and you’ll really utilize a Google example. Go to the Cloud Composer documentation’s Triggering using the GCF page. If you follow the documentation to the letter, you’ll have a cloud function that calls a DAG when a GCS file is completed. You will end up with the first stage in your simple GCP Data Pipeline if you follow the recommendations and implement it precisely as provided (with a little minor change described below).

The only difference is that for the trigger Bucket, you’ll need to use the incoming bucket that you created earlier. Because they are globally unique, you cannot use the one I use or the one in the manual. You’ll have a DAG called trigger response dag.py with the DAG id composer sample trigger response dag once you’ve completed this step. In the following steps, you’ll use that DAG.

GCP Data Pipeline: Create DAG for Composer

The composer’s DAG will be created as the final phase of GCP Data Pipeline. You still have your tasks to complete: uploading the CSV file from GCS to BQ, creating an aggregation, exporting the aggregated data from BQ to GCS, and sending an email. From the GCF step, edit the DAG. Remove one task from the DAG (print gcs info). you’ll make your own steps in its place. Some imports will need to be added to the existing imports. You will be able to call the required operators using these imports.

from airflow.contrib.operators import gcs_to_bq
from airflow.contrib.operators.bigquery_to_gcs import BigQueryToCloudStorageOperator
from airflow.operators.email_operator import EmailOperator
from airflow.contrib.operators.bigquery_operator import
BigQueryOperator

Add these variables below the imports and slightly above the default args declaration:

today = date.today().strftime(“%Y%m%d”)
outgoing_bucket = “lrc-dev-outgoing-sales-aggregate”
sales_table = “sales_data.current_sales”
aggregate_table = “sales_data.total_sales”

These are variables that will be used in the following steps of setting up GCP Data Pipeline. They should be self-explanatory, and you’ll soon see how they’re put to use. Replace the bash job with the following steps inside the “with airflow.DAG()” section (the DAG’s body). First, you’ll read the file from GCS into BQ. For this, you’ll utilize the GoogleCloudStorageToBigQueryOperator operator:

# Load Data from GCS to BQ
GCStoBQ = gcs_to_bq.GoogleCloudStorageToBigQueryOperator(
     task_id=’GCStoBQ’,
     bucket=”{{ dag_run.conf[‘bucket’] }}” ,
     source_objects=[ “{{ dag_run.conf[‘name’] }}” ],
     destination_project_dataset_table= sales_table,
     schema_fields=[{“name”: “customer_name”, “type”: “STRING”, “mode”: “REQUIRED”},
                  {“name”: “product_id”, “type”: “STRING”, “mode”: “REQUIRED”},
                  {“name”: “item_count”, “type”: “INTEGER”, “mode”: “REQUIRED”},
                  {“name”: “item_cost”, “type”: “FLOAT”, “mode”: “REQUIRED”}
                  ],
     source_format=’CSV’,
     #GZIP or NONE
     compression=’NONE’,
     create_disposition=’CREATE_IF_NEEDED’,
     skip_leading_rows=1,
     # WRITE_DISPOSITION_UNSPECIFIED, WRITE_EMPTY, WRITE_TRUNCATE, WRITE_APPEND
     write_disposition=’WRITE_APPEND’,
     field_delimiter=’,’,
     max_bad_records=0,
     quote_character=None,
     ignore_unknown_values=False,
     allow_quoted_newlines=False,
     allow_jagged_rows=False,
     encoding=’UTF-8′,
     max_id_key=None,
     autodetect=False
     )

Your table is defined by the schema fields argument, and the CSV file should follow that format. Notice the bucket=”{{ dag_run.conf[‘bucket’] }}”, and source_objects=[ “{{dag_run.conf[‘name’] }}” ], parameters. As a dag run.conf input argument, the GCF function communicates the bucket and file name that was uploaded to the receiving bucket to the DAG. Jinja templates are enabled by these two options in the operator. The bucket and file names are extracted using the syntax you use. Because you know what the incoming bucket is, you might hard code the bucket name.

However, because you may call this dag from many buckets in the future, and you will most likely have distinct bucket names for dev, test, and prod, it is a preferable practice to extract the bucket name from the function. The remaining variables listed above might be made into Airflow variables, which is suggested. However, that is outside the scope of this paper. The data will be in BigQuery once this process is completed. The BigQueryOperator’s next responsibility will be to aggregate the data into a new table for export. If this table already exists, it will be shortened.

# Create aggregate table
Transform = BigQueryOperator(
     task_id=’Transform’,
     sql=(“SELECT customer_name, product_id, sum(item_count) prd_total_count,”
             ” sum(item_cost*item_count) prd_total_cost”
             f” FROM {sales_table}”
             ” GROUP BY customer_name, product_id;” ),
     destination_dataset_table=aggregate_table,
     use_legacy_sql=False,
     write_disposition=’WRITE_TRUNCATE’,
     create_disposition=’CREATE_IF_NEEDED’,
     dag=dag
)

To produce aggregates, the query sums the item count and sums the item count * the item cost, and is aggregated by the customer name and product id. 

The third task is to export your new table to GCS so that it can be used by the other team. With today’s date attached to it, you’ll call your file sales data aggregate. For the date, you’ll utilize the today variable you declared before.

BQtoGCS = BigQueryToCloudStorageOperator(
     task_id=’BQtoGCS’,
     source_project_dataset_table=aggregate_table,
     destination_cloud_storage_uris=[f”gs://{outgoing_bucket}/sales_data_agg_{today}*.csv”],
     provide_context=True,
     export_format=’CSV’
   )

The final step in GCP Data Pipeline is to send an email to the other team notifying them that the file is available for pickup. “otherteam@lrc.com” will receive an email from you.

SendEmail = EmailOperator(
     task_id=’SendEmail’,
     to=[“otherteam@lrc.com”],
     subject=f”Data Aggregate available for date {today}”,
     html_content=”<html><body><p><h2>Run completed successfully.</h2></p>”
             f”<p>File Located at: gs://{outgoing_bucket}/sales_data_agg_{today}*.csv</p>”
             “</body></html>”,
     dag=dag,
)

Add the following execution line to the bottom of the DAG:

GCStoBQ >> Transform >> BQtoGCS >> SendEmail

You now have a fully functional GCP Data Pipeline. In the airflow console, the finished DAG should look like this:

An example of a CSV to upload to the incoming bucket is shown below.

customer_name,product_id,item_count,item_cost
ABC,PRD123,5,1.25
DEF,PRD456,25,0.07
GHI,PRD765,13,9.99

You can see how the aggregate values increase over time if you upload it multiple times.

Conclusion

That’s all it takes to build a basic but completely functional GCP Data Pipeline. You ingested data, modified it, extracted new data, and notified the user. It’s worth noting that there’s no data validation or error handling, which you’d want to include in a production system. Logging is nearly automatic on GCP.

However, as a Developer, extracting complex data from diverse data sources like Databases, CRMs, Project management Tools, Streaming Services, and Marketing Platforms to your Database can seem quite challenging. If you are from a non-technical background or are new in the data warehouse and analytics game, Hevo Data can help!

Visit yourWebsite to Explore Hevo

Hevo Data will automate your data transfer process, hence allowing you to focus on other aspects of your business like Analytics, Customer Management, etc. This platform allows you to transfer data from 150+ multiple sources to Cloud-based Data Warehouses like Snowflake, Google BigQuery, Amazon Redshift, etc. It will provide you with a hassle-free experience and make your work life much easier.

Want to take Hevo for a spin? Sign Up for a 14-day free trial and experience the feature-rich Hevo suite firsthand.

You can also have a look at our unbeatable pricing that will help you choose the right plan for your business needs!

No-Code Data Pipeline for Google Cloud Platform