Stream data using the Storage Write API (original) (raw)

Stay organized with collections Save and categorize content based on your preferences.

This document describes how to use theBigQuery Storage Write API to stream data into BigQuery.

In streaming scenarios, data arrives continuously and should be available for reads with minimal latency. When using the BigQuery Storage Write API for streaming workloads, consider what guarantees you need:

In committed type, data written to the stream is available for query as soon as the server acknowledges the write request. The default stream also uses committed type, but does not provide exactly-once guarantees.

Use the default stream for at-least-once semantics

If your application can accept the possibility of duplicate records appearing in the destination table, then we recommend using thedefault stream for streaming scenarios.

The following code shows how to write data to the default stream:

Java

To learn how to install and use the client library for BigQuery, seeBigQuery client libraries. For more information, see theBigQuery Java API reference documentation.

To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.

Node.js

To learn how to install and use the client library for BigQuery, seeBigQuery client libraries.

To authenticate to BigQuery, set up Application Default Credentials. For more information, seeSet up authentication for client libraries.

Python

This example shows how to insert a record with two fields using the default stream:

from google.cloud import bigquery_storage_v1
from google.cloud.bigquery_storage_v1 import types
from google.cloud.bigquery_storage_v1 import writer
from google.protobuf import descriptor_pb2
import logging
import json

import sample_data_pb2

# The list of columns from the table's schema to search in the given data to write to BigQuery.
TABLE_COLUMNS_TO_CHECK = [
    "name",
    "age"
    ]

# Function to create a batch of row data to be serialized.
def create_row_data(data):
    row = sample_data_pb2.SampleData()
    for field in TABLE_COLUMNS_TO_CHECK:
      # Optional fields will be passed as null if not provided
      if field in data:
        setattr(row, field, data[field])
    return row.SerializeToString()

class BigQueryStorageWriteAppend(object):

    # The stream name is: projects/{project}/datasets/{dataset}/tables/{table}/_default
    def append_rows_proto2(
        project_id: str, dataset_id: str, table_id: str, data: dict
    ):

        write_client = bigquery_storage_v1.BigQueryWriteClient()
        parent = write_client.table_path(project_id, dataset_id, table_id)
        stream_name = f'{parent}/_default'
        write_stream = types.WriteStream()

        # Create a template with fields needed for the first request.
        request_template = types.AppendRowsRequest()

        # The request must contain the stream name.
        request_template.write_stream = stream_name

        # Generating the protocol buffer representation of the message descriptor.
        proto_schema = types.ProtoSchema()
        proto_descriptor = descriptor_pb2.DescriptorProto()
        sample_data_pb2.SampleData.DESCRIPTOR.CopyToProto(proto_descriptor)
        proto_schema.proto_descriptor = proto_descriptor
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.writer_schema = proto_schema
        request_template.proto_rows = proto_data

        # Construct an AppendRowsStream to send an arbitrary number of requests to a stream.
        append_rows_stream = writer.AppendRowsStream(write_client, request_template)

        # Append proto2 serialized bytes to the serialized_rows repeated field using create_row_data.
        proto_rows = types.ProtoRows()
        for row in data:
            proto_rows.serialized_rows.append(create_row_data(row))

        # Appends data to the given stream.
        request = types.AppendRowsRequest()
        proto_data = types.AppendRowsRequest.ProtoData()
        proto_data.rows = proto_rows
        request.proto_rows = proto_data

        append_rows_stream.send(request)

        print(f"Rows to table: '{parent}' have been written.")

if __name__ == "__main__":

    ###### Uncomment the below block to provide additional logging capabilities ######
    #logging.basicConfig(
    #    level=logging.DEBUG,
    #    format="%(asctime)s [%(levelname)s] %(message)s",
    #    handlers=[
    #        logging.StreamHandler()
    #    ]
    #)
    ###### Uncomment the above block to provide additional logging capabilities ######

    with open('entries.json', 'r') as json_file:
        data = json.load(json_file)
    # Change this to your specific BigQuery project, dataset, table details
    BigQueryStorageWriteAppend.append_rows_proto2("PROJECT_ID","DATASET_ID", "TABLE_ID ",data=data)

This code example depends on the compiled protocol module sample_data_pb2.py. To create the compiled module, execute theprotoc --python_out=. sample_data.proto command, where protoc is the protocol buffer compiler. The sample_data.proto file defines the format of the messages used in the Python example. To install the protoc compiler, follow the instructions in Protocol Buffers - Google's data interchange format.

Here are the contents of the sample_data.proto file:

message SampleData {
  required string name = 1;
  required int64 age = 2;
}

This script consumes the entries.json file, which contains sample row data to be inserted into the BigQuery table:

{"name": "Jim", "age": 35}
{"name": "Jane", "age": 27}

Use multiplexing

You enablemultiplexingat the stream writer level for default stream only. To enable multiplexing in Java, call the setEnableConnectionPool method when you construct aStreamWriter or JsonStreamWriter object.

After enabling the connection pool, the Java client library manages your connections in the background, scaling up connections if the existing connections are considered too busy. For automatic scaling up to be more effective, you should consider lowering the maxInflightRequestslimit.

// One possible way for constructing StreamWriter StreamWriter.newBuilder(streamName) .setWriterSchema(protoSchema) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build(); // One possible way for constructing JsonStreamWriter JsonStreamWriter.newBuilder(tableName, bigqueryClient) .setEnableConnectionPool(true) .setMaxInflightRequests(100) .build();

To enable multiplexing in Go, seeConnection Sharing (Multiplexing).

Use committed type for exactly-once semantics

If you need exactly-once write semantics, create a write stream in committed type. In committed type, records are available for query as soon as the client receives acknowledgement from the back end.

Committed type provides exactly-once delivery within a stream through the use of record offsets. By using record offsets, the application specifies the next append offset in each call to AppendRows. The write operation is only performed if the offset value matches the next append offset. For more information, seeManage stream offsets to achieve exactly-once semantics.

If you don't provide an offset, then records are appended to the current end of the stream. In that case, if an append request returns an error, retrying it could result in the record appearing more than once in the stream.

To use committed type, perform the following steps:

Java

  1. Call CreateWriteStream to create one or more streams in committed type.
  2. For each stream, call AppendRows in a loop to write batches of records.
  3. Call FinalizeWriteStream for each stream to release the stream. After you call this method, you cannot write any more rows to the stream. This step is optional in committed type, but helps to prevent exceeding the limit on active streams. For more information, seeLimit the rate of stream creation.

Node.js

  1. Call createWriteStreamFullResponse to create one or more streams in committed type.
  2. For each stream, call appendRows in a loop to write batches of records.
  3. Call finalize for each stream to release the stream. After you call this method, you cannot write any more rows to the stream. This step is optional in committed type, but helps to prevent exceeding the limit on active streams. For more information, seeLimit the rate of stream creation.

You cannot delete a stream explicitly. Streams follow the system-defined time to live (TTL):

The following code shows how to use committed type:

Use the Apache Arrow format to ingest data

The following code shows how to ingest data using the Apache Arrow format. For a more detailed, end-to-end example, see the PyArrow example on GitHub.

Python

This example shows how to ingest a serialized PyArrow table using the default stream.

from google.cloud.bigquery_storage_v1 import types as gapic_types
from google.cloud.bigquery_storage_v1.writer import AppendRowsStream
from google.cloud import bigquery_storage_v1

def append_rows_with_pyarrow(
  pyarrow_table: pyarrow.Table,
  project_id: str,
  dataset_id: str,
  table_id: str,
):
  bqstorage_write_client = bigquery_storage_v1.BigQueryWriteClient()

  # Create request_template.
  request_template = gapic_types.AppendRowsRequest()
  request_template.write_stream = (
      f"projects/{project_id}/datasets/{dataset_id}/tables/{table_id}/_default"
  )
  arrow_data = gapic_types.AppendRowsRequest.ArrowData()
  arrow_data.writer_schema.serialized_schema = (
      pyarrow_table.schema.serialize().to_pybytes()
  )
  request_template.arrow_rows = arrow_data

  # Create AppendRowsStream.
  append_rows_stream = AppendRowsStream(
      bqstorage_write_client,
      request_template,
  )

  # Create request with table data.
  request = gapic_types.AppendRowsRequest()
  request.arrow_rows.rows.serialized_record_batch = (
      pyarrow_table.to_batches()[0].serialize().to_pybytes()
  )

  # Send request.
  future = append_rows_stream.send(request)

  # Wait for result.
  future.result()