BigQuery is a Serverless Data Warehouse that can store Petabytes of data. It provides a comprehensive SQL layer and high-performance querying ability. BigQuery comes with built-in Machine Learning support and is known as a good option for serving Machine Learning loads. It competes with Amazon Redshift, Snowflake, Azure SQL data warehouse, etc.

BigQuery also provides a set of complementary services that increase in value proposition from just being a data warehouse. BigQueryML helps customers to integrate machine learning with SQL constructs. The connected sheet feature from BigQuery allows the data in BigQuery to be analyzed using Google sheets. This post is about how to use the Google BigQuery Storage API to read and write data.

Prerequisites

  • Google cloud account with BigQuery Storage API enabled and ‘bigquery.tables.updateData’ permissions.
  • Basic knowledge of Java.

What is Google BigQuery APIs?

Google BigQuery API is a data platform for group of users to create, manage, share and query data. Whenever complex datasets are introduced into BigQuery, the system collects your data, analyses the data, and transmits the result queries. The BigQuery API takes care of the whole process from collection of data to transmission of the query result.

Types of Google BigQuery APIs

BigQuery APIs are categorized into five types and client libraries(Python, Go, and Java). These types are as follows shown in the diagram:

Types of Google BigQuery API
Image Source
  1. Core API
  2. Data Transfer API
  3. Storage API
  4. Reservation API
  5. Connection API

In further reading, we will discuss Storage API in detail.

Understanding BigQuery Storage API

BigQuery Storage API includes Write and Read APIs. 

Write APIs support both Streaming Workloads as well as batch-based loads. It supports exactly-once semantics and transactions.  In a nutshell, BigQuery Write API provides three kinds of write operations – Committed mode, Pending Mode, and Buffered mode. In Committed mode, the records are available for reading as soon as they are written.

In the Pending mode, the records are available only after the transaction has been committed. The pending mode only allows one chance to commit the entire transaction. Buffered mode expands on Pending mode to provide the feature of offsets, which helps to commit specific parts of the writes allowing them to control exactly when to commit specific rows. Pending mode is generally used for bulk upload of data to BigQuery.

Read APIs allow users to retrieve rows as structured responses with pagination. BigQuery allows creating multiple streams for reading from the same table. This means it is possible to read disjoint sets of data in the same session. While creating the session, the user can specify the required columns in the table to make the reads more efficient. It is also possible to filter data based on columns from the server-side. All storage sessions work based on the snapshot model. In other words, data that was present in BigQuery at the time of session creation will only be available for reading.

Simplify Google BigQuery ETL and Analysis with Hevo’s No-code Data Pipeline

A fully managed No-code Data Pipeline platform like Hevo Data helps you integrate and load data from 150+ different sources (including 40+ free sources) to a Data Warehouse such as Google BigQuery or Destination of your choice in real-time in an effortless manner. Hevo with its minimal learning curve can be set up in just a few minutes allowing the users to load data without having to compromise performance. Its strong integration with umpteenth sources allows users to bring in data of different kinds in a smooth fashion without having to code a single line. 

Get Started with Hevo for Free

Check out some of the cool features of Hevo:

  • Completely Automated: The Hevo platform can be set up in just a few minutes and requires minimal maintenance.
  • Transformations: Hevo provides preload transformations through Python code. It also allows you to run transformation code for each event in the Data Pipelines you set up. You need to edit the event object’s properties received in the transform method as a parameter to carry out the transformation. Hevo also offers drag and drop transformations like Date and Control Functions, JSON, and Event Manipulation to name a few. These can be configured and tested before putting them to use.
  • Connectors: Hevo supports 150+ integrations to SaaS platforms, files, Databases, analytics, and BI tools. It supports various destinations including Google BigQuery, AmazonRedshift, SnowflakeDataWarehouses; Amazon S3 Data Lakes; and MySQL, SQL Server, TokuDB, DynamoDB, PostgreSQL Databases to name a few.  
  • Real-Time Data Transfer: Hevo provides real-time data migration, so you can have analysis-ready data always.
  • 100% Complete & Accurate Data Transfer: Hevo’s robust infrastructure ensures reliable data transfer with zero data loss.
  • Scalable Infrastructure: Hevo has in-built integrations for 150+ sources (including 40+ free sources) that can help you scale your data infrastructure as required.
  • 24/7 Live Support: The Hevo team is available round the clock to extend exceptional support to you through chat, email, and support calls.
  • Schema Management: Hevo takes away the tedious task of schema management & automatically detects the schema of incoming data and maps it to the destination schema.
  • Live Monitoring: Hevo allows you to monitor the data flow so you can check where your data is at a particular point in time.
Sign up here for a 14-Day Free Trial!

Why use BigQuery Storage API?

Using the BigQuery Storage API, structured data is sent over the network in binary serialized format. This provides additional parallelism between multiple consumers of the result set.
If your business requirement requires scanning large amounts of managed data, use the BigQuery Storage API, as it provides high data read throughput for consumers.

How to use the BigQuery Storage API?

You can refer to the steps discussed below to use the BigQuery Read and Write APIs:

Step 1: Setting up the Account and Client Library

BigQuery provides Client Libraries in all the popular languages like Java, NodeJS, Python, etc. For this post, we will use the Java Client. It requires one to first create an account with the Google Developer console and get the keys for API usage. Let us learn how this can be done.

  • Head to the Google Cloud Service account page and set up a Service Account to access BigQuery from external libraries. Click on the New Service account and provide a name for the account. The role must be owner or editor. The other details will be populated automatically.
BigQuery Storage API - Creating Service Account Key
  • Click on Create to download a JSON file containing the application credentials. . Download the file to use with the next steps.
  • The next step is to set up the environment variables for the Java application script to use while accessing BigQuery. 
export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/my-key
.json"

This environment variable should reflect the actual path to your application credentials file.

  • Now add the below snippet to your Java build configuration to ensure the BigQuery client library is ready to be used.
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>24.3.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigquerystorage</artifactId>
  </dependency>
</dependencies>

Step 2: Reading from BigQuery using APIs

We will now implement a Java application to read data from the BigQuery public data table. This dataset is available by default to all BigQuery accounts.

  • For setting up the imports, use the following import statements in your Java application.
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
  • Let us now create the main function and set up the BigQuery client.
 public class BigQueryReadTest {

	public static void main(String... args) throws Exception {
	// Sets your Google Cloud Platform project ID.
	// String projectId = "YOUR_PROJECT_ID";
	String projectId = "YOUR_PROJECT_ID";
	Integer snapshotMillis = 100;
 	 
	BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);


	try (BigQueryReadClient client = BigQueryReadClient.create()) {
  	String parent = String.format("projects/%s", projectId);

  	// This example uses baby name data from the public datasets.
  	String srcTable =
      	String.format(
          	"projects/%s/datasets/%s/tables/%s",
          	"bigquery-public-data", "usa_names", "usa_1910_current");

  	// We specify the columns to be projected by adding them to the selected fields,
  	// and set a simple filter to restrict which rows are transmitted.
  	TableReadOptions options =
      	TableReadOptions.newBuilder()
          	.addSelectedFields("name")
          	.addSelectedFields("number")
          	.addSelectedFields("state")
          	.setRowRestriction("state = "WA"")
          	.build();

  	// Start specifying the read session we want created.
  	ReadSession.Builder sessionBuilder =
      	ReadSession.newBuilder()
          	.setTable(srcTable)
          	// This API can also deliver data serialized in Apache Avro format.
          	// This example leverages Apache Arrow.
          	.setDataFormat(DataFormat.ARROW)
          	.setReadOptions(options);

  	// Begin building the session creation request.
  	CreateReadSessionRequest.Builder builder =
      	CreateReadSessionRequest.newBuilder()
          	.setParent(parent)
          	.setReadSession(sessionBuilder)
          	.setMaxStreamCount(1);

In the above section, we set up a BigQuery client and specify read options to reflect the relevant columns that need to be read. Ensure that you replace the projectId variable with your correct one. We then build a session and configure Arrow as the schema for reading. BigQuery also supports the Avro schema for reading. Arrow and Avro are two schemas that are generally used in processing columnar workloads. 

  • The next part is to build the session, create a list of column names for the schema of the table, fetch available streams in BigQuery, read from the first stream, and then deserialize what is read from BigQuery using the column list. 
ReadSession session = client.createReadSession(builder.build());
  	// Setup a simple reader and start a read session.
  	ArrowSchema arrowSchema = session.getArrowSchema();
  	Schema schema =
      	MessageSerializer.deserializeSchema(
          	new ReadChannel(
              	new ByteArrayReadableSeekableByteChannel(
                  	arrowSchema.getSerializedSchema().toByteArray())));
  	Preconditions.checkNotNull(schema);
  	List<FieldVector> vectors = new ArrayList<>();
  	for (Field field : schema.getFields()) {
    	vectors.add(field.createVector(allocator));
  	}
  	VectorSchemaRoot root = new VectorSchemaRoot(vectors);
  	VectorLoader loader = new VectorLoader(root);
 	 

    	Preconditions.checkState(session.getStreamsCount() > 0);

    	// Use the first stream to perform reading.
    	String streamName = session.getStreams(0).getName();

    	ReadRowsRequest rowsRequest =
        	ReadRowsRequest.newBuilder().setReadStream(streamName).build();

    	ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(rowsRequest);
    	for (ReadRowsResponse response : stream) {
      	Preconditions.checkState(response.hasArrowRecordBatch());
      	org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
      	MessageSerializer.deserializeRecordBatch(
          	new ReadChannel(
              	new ByteArrayReadableSeekableByteChannel(
           			   response.getArrowRecordBatch().getSerializedRecordBatch().toByteArray())),
          	allocator);

      	loader.load(deserializedBatch);
deserializedBatch.close();
      	System.out.println(root.contentToTSVString());
  	  	root.clear();	 
   	 
  	}
	}
  }
}

The columnar nature of BigQuery means there is a bit of complexity in dealing with the schema to make the reads more efficient. BufferAllocator, VectorLoader, and VectorSchemaRoot are utility classes that are required to handle complexities around the ArrowSchema.

That is all, there is to read data from BigQuery Storage API. The above snippet of code will print the column values that are read. 

Step 3: Writing to BigQuery using APIs

As discussed above, BigQuery provides three kinds of writes. In this post, we will use the committed mode for simplicity. We will try to add rows to a table with a single column called ‘column1’. This tutorial assumes you have already created a table with a single column. If you are not familiar with this, head to this guide for creating a table.

  • We will begin with setting up the imports for our Java class.
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;
  • Now, we need to create a main function and build the logic for appending rows to a stream. Use the below code snippet to do it.
public class BigQueryWrite {

public static void main(String[] args) throws IOException, IllegalArgumentException, DescriptorValidationException, InterruptedException {
    
    String projectId = "YOUR_PROJECT_ID";
    String dataset = "YOUR_DATASET_NAME";
    String table = "YOUR_TABLE_NAME";
	try (BigQueryWriteClient bqClient = BigQueryWriteClient.create()) {
	WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
	TableName tableName = TableName.of(projectId, dataset, table);
	CreateWriteStreamRequest createWriteStreamRequest =
    	CreateWriteStreamRequest.newBuilder()
        	.setParent(tableName.toString())
        	.setWriteStream(stream)
        	.build();
	WriteStream writeStream = bqClient.createWriteStream(createWriteStreamRequest);


	try (JsonStreamWriter writer =
    	JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
        	.build()) {
  	for (int i = 0; i < 2; i++) {
    	JSONArray jsonArr = new JSONArray();
    	for (int j = 0; j < 10; j++) {
      	JSONObject record = new JSONObject();
      	record.put("column1", String.format("record %03d-%03d", i, j));
      	jsonArr.put(record);
    	}

    	ApiFuture<AppendRowsResponse> future = writer.append(jsonArr, /*offset=*/ i * 10);
    	AppendRowsResponse response = future.get();
  	}
  	// Finalize the stream after use.
  	FinalizeWriteStreamRequest finalizeWriteStreamRequest =
      	FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build();
  	bqClient.finalizeWriteStream(finalizeWriteStreamRequest);
	}
	System.out.println("Appended records successfully.");
  } catch (ExecutionException e) {
	System.out.println("Failed to append records. n" + e.toString());
  }
}

}

In the above function, we initialize the BigQuery table details using PROJECT_ID, DATASET, and TABLE variables. We then build a BigQueryClient object and create a WriteStream using it. The next step is to create two batches of records to be inserted. We use two batches here only to reflect the point that it is never a good idea to use a WriteStream for a single batch insertion.

The idea is to use it as much as possible since creating it is an expensive operation. While writing the JSON array, we also append a key called offset to avoid duplicate values. This is done by setting the total count after each batch as the offset. After this, we can use FinalizeWriteStreamRequest  to complete the write. 

That concludes our effort in writing to BigQuery using the BigQuery Storage API. Let us know recap and understand the challenges in doing this.

  1. As evident from the Read and Write mechanisms above, this is not for the faint-hearted. Because of BigQuery’s columnar nature, the read and write code needs to deal with schemas and the complexity around deserializing it. 
  2. What we accomplished here is a simple read and write operation. In reality, in a production system, there will be a lot more complications to avoid duplicates, joining multiple sources, etc. Add these complexities to the already complex deserializing mechanism, the developers are in for a tough job. 

Conclusion

This post attempts to simplify the usage of BigQuery Storage API for the user. We have now learned to read rows from BigQuery and to execute write operations in committed mode. Handling these operations required the developers to wrap around the schema complexities and a bit of code. If you are looking for a no-code solution to move data to and from BigQuery, you should check out Hevo.

Hevo Data, a No-code Data Pipeline provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations with a few clicks.

Visit our Website to Explore Hevo

Hevo Data with its strong integration with 150+ data sources (including 40+ Free Sources) allows you to not only export data from your desired data sources & load it to the destination of your choice but also transform & enrich your data to make it analysis-ready. Hevo also allows integrating data from non-native sources using Hevo’s in-built Webhooks Connector. You can then focus on your key business needs and perform insightful analysis using BI tools. 

Want to give Hevo a try?

Sign Up for a 14-day free trial and experience the feature-rich Hevo suite first hand. You may also have a look at the amazing price, which will assist you in selecting the best plan for your requirements.

Share your experience of learning about the BigQuery Storage API in the comment section below!

Talha
Software Developer, Hevo Data

Talha is a Software Developer with over eight years of experience in the field. He is currently driving advancements in data integration at Hevo Data, where he has been instrumental in shaping a cutting-edge data integration platform for the past four years. Prior to this, he spent 4 years at Flipkart, where he played a key role in projects related to their data integration capabilities. Talha loves to explain complex information related to data engineering to his peers through writing. He has written many blogs related to data integration, data management aspects, and key challenges data practitioners face.

No-code Data Pipeline for Google BigQuery