BigQuery is a Serverless Data Warehouse that can store Petabytes of data. It provides a comprehensive SQL layer and high-performance querying ability. BigQuery comes with built-in Machine Learning support and is known as a good option for serving Machine Learning loads.

BigQuery also provides a set of complementary services that increase in value proposition from just being a data warehouse. BigQueryML helps customers to integrate machine learning with SQL constructs. The connected sheet feature from BigQuery allows the data in BigQuery to be analyzed using Google sheets. This post is about how to use the Google BigQuery Storage API to read and write data.

What is Google BigQuery APIs?

Google BigQuery API is a data platform for group of users to create, manage, share and query data. Whenever complex datasets are introduced into BigQuery, the system collects your data, analyses the data, and transmits the result queries. The BigQuery API takes care of the whole process from collection of data to transmission of the query result.

Types of Google BigQuery APIs

BigQuery APIs are categorized into five types and client libraries(Python, Go, and Java). These types are as follows shown in the diagram:

Types of BigQuery API
  1. Core API
  2. Data Transfer API
  3. Storage API
  4. Reservation API
  5. Connection API

In further reading, we will discuss Storage API in detail.

Integrate Salesforce to BigQuery
Integrate Linkedin Ads to BigQuery
Integrate MongoDB to BigQuery

Understanding BigQuery Storage API

BigQuery Storage API includes Write and Read APIs. 

Write APIs support both Streaming Workloads as well as batch-based loads. It supports exactly-once semantics and transactions.  In a nutshell, BigQuery Write API provides three kinds of write operations – Committed mode, Pending Mode, and Buffered mode. In Committed mode, the records are available for reading as soon as they are written.

In the Pending mode, the records are available only after the transaction has been committed. The pending mode only allows one chance to commit the entire transaction. Buffered mode expands on Pending mode to provide the feature of offsets, which helps to commit specific parts of the writes allowing them to control exactly when to commit specific rows. Pending mode is generally used for bulk upload of data to BigQuery.

Read APIs allow users to retrieve rows as structured responses with pagination. BigQuery allows creating multiple streams for reading from the same table. This means it is possible to read disjoint sets of data in the same session. While creating the session, the user can specify the required columns in the table to make the reads more efficient. It is also possible to filter data based on columns from the server-side. All storage sessions work based on the snapshot model. In other words, data that was present in BigQuery at the time of session creation will only be available for reading.

Simplify Google BigQuery ETL and Analysis with Hevo!

Hevo is a fully managed, no-code data pipeline platform that effortlessly integrates data from more than 150 sources into a data warehouse such as BigQuery. With its minimal learning curve, Hevo can be set up in just a few minutes, allowing users to load data without having to compromise performance. Its features include: 

  • Connectors: Hevo supports 150+ integrations to SaaS platforms, files, Databases, analytics, and BI tools. It supports various destinations, including Google BigQuery, Amazon Redshift, and Snowflake.
  • Transformations: A simple Python-based drag-and-drop data transformation technique that allows you to transform your data for analysis.
  • Schema Management: Hevo eliminates the tedious task of schema management. It automatically detects the schema of incoming data and maps it to the destination schema.
  • Real-Time Data Transfer: Hevo provides real-time data migration, so you can always have analysis-ready data.
  • 24/7 Live Support: The Hevo team is available 24/7 to provide exceptional support through chat, email, and support calls.

Try Hevo today to experience seamless data transformation and migration.

Sign up here for a 14-Day Free Trial!

Why use BigQuery Storage API?

Using the BigQuery Storage API, structured data is sent over the network in binary serialized format. This provides additional parallelism between multiple consumers of the result set.
If your business requirement requires scanning large amounts of managed data, use the BigQuery Storage API, as it provides high data read throughput for consumers.

Prerequisites

  • Google cloud account with BigQuery Storage API enabled and ‘bigquery.tables.updateData’ permissions.
  • Basic knowledge of Java.

How to use the BigQuery Storage API?

You can refer to the steps discussed below to use the BigQuery Read and Write APIs:

Step 1: Setting up the Account and Client Library

BigQuery provides Client Libraries in all the popular languages like Java, NodeJS, Python, etc. For this post, we will use the Java Client. It requires one to first create an account with the Google Developer console and get the keys for API usage. Let us learn how this can be done.

  • Head to the Google Cloud Service account page and set up a Service Account to access BigQuery from external libraries. Click on the New Service account and provide a name for the account. The role must be owner or editor. The other details will be populated automatically.
BigQuery Storage API - Creating Service Account Key
  • Click on Create to download a JSON file containing the application credentials. . Download the file to use with the next steps.
  • The next step is to set up the environment variables for the Java application script to use while accessing BigQuery. 
export GOOGLE_APPLICATION_CREDENTIALS="/home/user/Downloads/my-key
.json"

This environment variable should reflect the actual path to your application credentials file.

  • Now add the below snippet to your Java build configuration to ensure the BigQuery client library is ready to be used.
<dependencyManagement>
  <dependencies>
    <dependency>
      <groupId>com.google.cloud</groupId>
      <artifactId>libraries-bom</artifactId>
      <version>24.3.0</version>
      <type>pom</type>
      <scope>import</scope>
    </dependency>
  </dependencies>
</dependencyManagement>

<dependencies>
  <dependency>
    <groupId>com.google.cloud</groupId>
    <artifactId>google-cloud-bigquerystorage</artifactId>
  </dependency>
</dependencies>

Step 2: Reading from BigQuery using APIs

We will now implement a Java application to read data from the BigQuery public data table. This dataset is available by default to all BigQuery accounts.

  • For setting up the imports, use the following import statements in your Java application.
import com.google.api.gax.rpc.ServerStream;
import com.google.cloud.bigquery.storage.v1.ArrowRecordBatch;
import com.google.cloud.bigquery.storage.v1.ArrowSchema;
import com.google.cloud.bigquery.storage.v1.BigQueryReadClient;
import com.google.cloud.bigquery.storage.v1.CreateReadSessionRequest;
import com.google.cloud.bigquery.storage.v1.DataFormat;
import com.google.cloud.bigquery.storage.v1.ReadRowsRequest;
import com.google.cloud.bigquery.storage.v1.ReadRowsResponse;
import com.google.cloud.bigquery.storage.v1.ReadSession;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableModifiers;
import com.google.cloud.bigquery.storage.v1.ReadSession.TableReadOptions;
import com.google.common.base.Preconditions;
import com.google.protobuf.Timestamp;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.arrow.memory.BufferAllocator;
import org.apache.arrow.memory.RootAllocator;
import org.apache.arrow.vector.FieldVector;
import org.apache.arrow.vector.VectorLoader;
import org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.arrow.vector.ipc.ReadChannel;
import org.apache.arrow.vector.ipc.message.MessageSerializer;
import org.apache.arrow.vector.types.pojo.Field;
import org.apache.arrow.vector.types.pojo.Schema;
import org.apache.arrow.vector.util.ByteArrayReadableSeekableByteChannel;
  • Let us now create the main function and set up the BigQuery client.
 public class BigQueryReadTest {

	public static void main(String... args) throws Exception {
	// Sets your Google Cloud Platform project ID.
	// String projectId = "YOUR_PROJECT_ID";
	String projectId = "YOUR_PROJECT_ID";
	Integer snapshotMillis = 100;
 	 
	BufferAllocator allocator = new RootAllocator(Long.MAX_VALUE);


	try (BigQueryReadClient client = BigQueryReadClient.create()) {
  	String parent = String.format("projects/%s", projectId);

  	// This example uses baby name data from the public datasets.
  	String srcTable =
      	String.format(
          	"projects/%s/datasets/%s/tables/%s",
          	"bigquery-public-data", "usa_names", "usa_1910_current");

  	// We specify the columns to be projected by adding them to the selected fields,
  	// and set a simple filter to restrict which rows are transmitted.
  	TableReadOptions options =
      	TableReadOptions.newBuilder()
          	.addSelectedFields("name")
          	.addSelectedFields("number")
          	.addSelectedFields("state")
          	.setRowRestriction("state = "WA"")
          	.build();

  	// Start specifying the read session we want created.
  	ReadSession.Builder sessionBuilder =
      	ReadSession.newBuilder()
          	.setTable(srcTable)
          	// This API can also deliver data serialized in Apache Avro format.
          	// This example leverages Apache Arrow.
          	.setDataFormat(DataFormat.ARROW)
          	.setReadOptions(options);

  	// Begin building the session creation request.
  	CreateReadSessionRequest.Builder builder =
      	CreateReadSessionRequest.newBuilder()
          	.setParent(parent)
          	.setReadSession(sessionBuilder)
          	.setMaxStreamCount(1);

In the above section, we set up a BigQuery client and specify read options to reflect the relevant columns that need to be read. Ensure that you replace the projectId variable with your correct one. We then build a session and configure Arrow as the schema for reading. BigQuery also supports the Avro schema for reading. Arrow and Avro are two schemas that are generally used in processing columnar workloads. 

  • The next part is to build the session, create a list of column names for the schema of the table, fetch available streams in BigQuery, read from the first stream, and then deserialize what is read from BigQuery using the column list. 
ReadSession session = client.createReadSession(builder.build());
  	// Setup a simple reader and start a read session.
  	ArrowSchema arrowSchema = session.getArrowSchema();
  	Schema schema =
      	MessageSerializer.deserializeSchema(
          	new ReadChannel(
              	new ByteArrayReadableSeekableByteChannel(
                  	arrowSchema.getSerializedSchema().toByteArray())));
  	Preconditions.checkNotNull(schema);
  	List<FieldVector> vectors = new ArrayList<>();
  	for (Field field : schema.getFields()) {
    	vectors.add(field.createVector(allocator));
  	}
  	VectorSchemaRoot root = new VectorSchemaRoot(vectors);
  	VectorLoader loader = new VectorLoader(root);
 	 

    	Preconditions.checkState(session.getStreamsCount() > 0);

    	// Use the first stream to perform reading.
    	String streamName = session.getStreams(0).getName();

    	ReadRowsRequest rowsRequest =
        	ReadRowsRequest.newBuilder().setReadStream(streamName).build();

    	ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(rowsRequest);
    	for (ReadRowsResponse response : stream) {
      	Preconditions.checkState(response.hasArrowRecordBatch());
      	org.apache.arrow.vector.ipc.message.ArrowRecordBatch deserializedBatch =
      	MessageSerializer.deserializeRecordBatch(
          	new ReadChannel(
              	new ByteArrayReadableSeekableByteChannel(
           			   response.getArrowRecordBatch().getSerializedRecordBatch().toByteArray())),
          	allocator);

      	loader.load(deserializedBatch);
deserializedBatch.close();
      	System.out.println(root.contentToTSVString());
  	  	root.clear();	 
   	 
  	}
	}
  }
}

The columnar nature of BigQuery means there is a bit of complexity in dealing with the schema to make the reads more efficient. BufferAllocator, VectorLoader, and VectorSchemaRoot are utility classes that are required to handle complexities around the ArrowSchema.

That is all, there is to read data from BigQuery Storage API. The above snippet of code will print the column values that are read. 

Step 3: Writing to BigQuery using APIs

As discussed above, BigQuery provides three kinds of writes. In this post, we will use the committed mode for simplicity. We will try to add rows to a table with a single column called ‘column1’. This tutorial assumes you have already created a table with a single column. If you are not familiar with this, head to this guide for creating a table.

  • We will begin with setting up the imports for our Java class.
import com.google.api.core.ApiFuture;
import com.google.cloud.bigquery.storage.v1.AppendRowsResponse;
import com.google.cloud.bigquery.storage.v1.BigQueryWriteClient;
import com.google.cloud.bigquery.storage.v1.CreateWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.FinalizeWriteStreamRequest;
import com.google.cloud.bigquery.storage.v1.JsonStreamWriter;
import com.google.cloud.bigquery.storage.v1.TableName;
import com.google.cloud.bigquery.storage.v1.WriteStream;
import com.google.protobuf.Descriptors.DescriptorValidationException;
import java.io.IOException;
import java.util.concurrent.ExecutionException;
import org.json.JSONArray;
import org.json.JSONObject;
  • Now, we need to create a main function and build the logic for appending rows to a stream. Use the below code snippet to do it.
public class BigQueryWrite {

public static void main(String[] args) throws IOException, IllegalArgumentException, DescriptorValidationException, InterruptedException {
    
    String projectId = "YOUR_PROJECT_ID";
    String dataset = "YOUR_DATASET_NAME";
    String table = "YOUR_TABLE_NAME";
	try (BigQueryWriteClient bqClient = BigQueryWriteClient.create()) {
	WriteStream stream = WriteStream.newBuilder().setType(WriteStream.Type.COMMITTED).build();
	TableName tableName = TableName.of(projectId, dataset, table);
	CreateWriteStreamRequest createWriteStreamRequest =
    	CreateWriteStreamRequest.newBuilder()
        	.setParent(tableName.toString())
        	.setWriteStream(stream)
        	.build();
	WriteStream writeStream = bqClient.createWriteStream(createWriteStreamRequest);


	try (JsonStreamWriter writer =
    	JsonStreamWriter.newBuilder(writeStream.getName(), writeStream.getTableSchema())
        	.build()) {
  	for (int i = 0; i < 2; i++) {
    	JSONArray jsonArr = new JSONArray();
    	for (int j = 0; j < 10; j++) {
      	JSONObject record = new JSONObject();
      	record.put("column1", String.format("record %03d-%03d", i, j));
      	jsonArr.put(record);
    	}

    	ApiFuture<AppendRowsResponse> future = writer.append(jsonArr, /*offset=*/ i * 10);
    	AppendRowsResponse response = future.get();
  	}
  	// Finalize the stream after use.
  	FinalizeWriteStreamRequest finalizeWriteStreamRequest =
      	FinalizeWriteStreamRequest.newBuilder().setName(writeStream.getName()).build();
  	bqClient.finalizeWriteStream(finalizeWriteStreamRequest);
	}
	System.out.println("Appended records successfully.");
  } catch (ExecutionException e) {
	System.out.println("Failed to append records. n" + e.toString());
  }
}

}

In the above function, we initialize the BigQuery table details using PROJECT_ID, DATASET, and TABLE variables. We then build a BigQueryClient object and create a WriteStream using it. The next step is to create two batches of records to be inserted. We use two batches here only to reflect the point that it is never a good idea to use a WriteStream for a single batch insertion.

The idea is to use it as much as possible since creating it is an expensive operation. While writing the JSON array, we also append a key called offset to avoid duplicate values. This is done by setting the total count after each batch as the offset. After this, we can use FinalizeWriteStreamRequest  to complete the write. 

That concludes our effort in writing to BigQuery using the BigQuery Storage API. Let us know recap and understand the challenges in doing this.

  1. As evident from the Read and Write mechanisms above, this is not for the faint-hearted. Because of BigQuery’s columnar nature, the read and write code needs to deal with schemas and the complexity around deserializing it. 
  2. What we accomplished here is a simple read and write operation. In reality, in a production system, there will be a lot more complications to avoid duplicates, joining multiple sources, etc. Add these complexities to the already complex deserializing mechanism, the developers are in for a tough job. 

Conclusion

This post attempts to simplify the usage of BigQuery Storage API for the user. We have now learned to read rows from BigQuery and to execute write operations in committed mode. Handling these operations required the developers to wrap around the schema complexities and a bit of code. If you are looking for a no-code solution to move data to and from BigQuery, you should check out Hevo.

Hevo provides you with a consistent and reliable solution to manage data transfer between a variety of sources and a wide variety of Desired Destinations with a few clicks. Sign up for Hevo’s 14-day free trial and experience seamless data migration.

FAQs

1. Is the Google BigQuery API free?

The Google BigQuery API is not entirely free. While there are free-tier usage limits, charges apply for queries, data storage, and data processing beyond those limits.

2. What is Google storage API?

The Google Cloud Storage API allows developers to interact programmatically with Google Cloud Storage. It provides functionalities for creating, reading, updating, and deleting storage buckets and objects.

Talha
Software Developer, Hevo Data

Talha is a Software Developer with over eight years of experience in the field. He is currently driving advancements in data integration at Hevo Data, where he has been instrumental in shaping a cutting-edge data integration platform for the past four years. Prior to this, he spent 4 years at Flipkart, where he played a key role in projects related to their data integration capabilities. Talha loves to explain complex information related to data engineering to his peers through writing. He has written many blogs related to data integration, data management aspects, and key challenges data practitioners face.