apache_beam.io.gcp.bigquery module — Apache Beam 2.65.0 documentation (original) (raw)

BigQuery sources and sinks.

This module implements reading from and writing to BigQuery tables. It relies on several classes exposed by the BigQuery API: TableSchema, TableFieldSchema, TableRow, and TableCell. The default mode is to return table rows read from a BigQuery source as dictionaries. Similarly a Write transform to a BigQuerySink accepts PCollections of dictionaries. This is done for more convenient programming. If desired, the native TableRow objects can be used throughout to represent rows (use an instance of TableRowJsonCoder as a coder argument when creating the sources or sinks respectively).

Also, for programming convenience, instances of TableReference and TableSchema have a string representation that can be used for the corresponding arguments:

The syntax supported is described here:https://cloud.google.com/bigquery/bq-command-line-tool-quickstart

BigQuery sources can be used as main inputs or side inputs. A main input (common case) is expected to be massive and will be split into manageable chunks and processed in parallel. Side inputs are expected to be small and will be read completely every time a ParDo DoFn gets executed. In the example below the lambda function implementing the DoFn for the Map transform will get on each call one row of the main table and all rows of the side table. The runner may use some caching techniques to share the side inputs between calls in order to avoid excessive reading::

main_table = pipeline | 'VeryBig' >> beam.io.ReadFromBigQuery(...) side_table = pipeline | 'NotBig' >> beam.io.ReadFromBigQuery(...) results = ( main_table | 'ProcessData' >> beam.Map( lambda element, side_input: ..., AsList(side_table)))

There is no difference in how main and side inputs are read. What makes the side_table a ‘side input’ is the AsList wrapper used when passing the table as a parameter to the Map transform. AsList signals to the execution framework that its input should be made available whole.

The main and side inputs are implemented differently. Reading a BigQuery table as main input entails exporting the table to a set of GCS files (in AVRO or in JSON format) and then processing those files.

Users may provide a query to read from rather than reading all of a BigQuery table. If specified, the result obtained by executing the specified query will be used as the data of the input transform.:

query_results = pipeline | beam.io.gcp.bigquery.ReadFromBigQuery( query='SELECT year, mean_temp FROM samples.weather_stations')

When creating a BigQuery input transform, users should provide either a query or a table. Pipeline construction will fail with a validation error if neither or both are specified.

When reading via ReadFromBigQuery using EXPORT, bytes are returned decoded as bytes. This is due to the fact that ReadFromBigQuery uses Avro exports by default. When reading from BigQuery using apache_beam.io.BigQuerySource, bytes are returned as base64-encoded bytes. To get base64-encoded bytes usingReadFromBigQuery, you can use the flag use_json_exports to export data as JSON, and receive base64-encoded bytes.

ReadAllFromBigQuery

Beam 2.27.0 introduces a new transform called ReadAllFromBigQuery which allows you to define table and query reads from BigQuery at pipeline runtime.::

read_requests = p | beam.Create([ ReadFromBigQueryRequest(query='SELECT * FROM mydataset.mytable'), ReadFromBigQueryRequest(table='myproject.mydataset.mytable')]) results = read_requests | ReadAllFromBigQuery()

A good application for this transform is in streaming pipelines to refresh a side input coming from BigQuery. This would work like so::

side_input = ( p | 'PeriodicImpulse' >> PeriodicImpulse( first_timestamp, last_timestamp, interval, True) | 'MapToReadRequest' >> beam.Map( lambda x: ReadFromBigQueryRequest(table='dataset.table')) | beam.io.ReadAllFromBigQuery()) main_input = ( p | 'MpImpulse' >> beam.Create(sample_main_input_elements) | 'MapMpToTimestamped' >> beam.Map(lambda src: TimestampedValue(src, src)) | 'WindowMpInto' >> beam.WindowInto( window.FixedWindows(main_input_windowing_interval))) result = ( main_input | 'ApplyCrossJoin' >> beam.FlatMap( cross_join, rights=beam.pvalue.AsIter(side_input)))

Note: This transform is supported on Portable and Dataflow v2 runners.

Note: This transform does not currently clean up temporary datasets created for its execution. (BEAM-11359)

Writing Data to BigQuery

The WriteToBigQuery transform is the recommended way of writing data to BigQuery. It supports a large set of parameters to customize how you’d like to write to BigQuery.

Table References

This transform allows you to provide static project, dataset and tableparameters which point to a specific BigQuery table to be created. The tableparameter can also be a dynamic parameter (i.e. a callable), which receives an element to be written to BigQuery, and returns the table that that element should be sent to.

You may also provide a tuple of PCollectionView elements to be passed as side inputs to your callable. For example, suppose that one wishes to send events of different types to different tables, and the table names are computed at pipeline runtime, one may do something like the following:

with Pipeline() as p: elements = (p | 'Create elements' >> beam.Create([ {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'}, {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'}, ]))

table_names = (p | 'Create table_names' >> beam.Create([ ('error', 'my_project:dataset1.error_table_for_today'), ('user_log', 'my_project:dataset1.query_table_for_today'), ]))

table_names_dict = beam.pvalue.AsDict(table_names)

elements | beam.io.gcp.bigquery.WriteToBigQuery( table=lambda row, table_dict: table_dict[row['type']], table_side_inputs=(table_names_dict,))

In the example above, the table_dict argument passed to the function intable_dict is the side input coming from table_names_dict, which is passed as part of the table_side_inputs argument.

Schemas

This transform also allows you to provide a static or dynamic schemaparameter (i.e. a callable).

If providing a callable, this should take in a table reference (as returned by the table parameter), and return the corresponding schema for that table. This allows to provide different schemas for different tables:

def compute_table_name(row): ...

errors_schema = {'fields': [ {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'message', 'type': 'STRING', 'mode': 'NULLABLE'}]} queries_schema = {'fields': [ {'name': 'type', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'query', 'type': 'STRING', 'mode': 'NULLABLE'}]}

with Pipeline() as p: elements = (p | beam.Create([ {'type': 'error', 'timestamp': '12:34:56', 'message': 'bad'}, {'type': 'user_log', 'timestamp': '12:34:59', 'query': 'flu symptom'}, ]))

elements | beam.io.gcp.bigquery.WriteToBigQuery( table=compute_table_name, schema=lambda table: (errors_schema if 'errors' in table else queries_schema))

It may be the case that schemas are computed at pipeline runtime. In cases like these, one can also provide a schema_side_inputs parameter, which is a tuple of PCollectionViews to be passed to the schema callable (much like the table_side_inputs parameter).

Additional Parameters for BigQuery Tables

This sink is able to create tables in BigQuery if they don’t already exist. It also relies on creating temporary tables when performing file loads.

The WriteToBigQuery transform creates tables using the BigQuery API by inserting a load job (see the API reference [1]), or by inserting a new table (see the API reference for that [2][3]).

When creating a new BigQuery table, there are a number of extra parameters that one may need to specify. For example, clustering, partitioning, data encoding, etc. It is possible to provide these additional parameters by passing a Python dictionary as additional_bq_parameters to the transform. As an example, to create a table that has specific partitioning, and clustering properties, one would do the following:

additional_bq_parameters = { 'timePartitioning': {'type': 'DAY'}, 'clustering': {'fields': ['country']}} with Pipeline() as p: elements = (p | beam.Create([ {'country': 'mexico', 'timestamp': '12:34:56', 'query': 'acapulco'}, {'country': 'canada', 'timestamp': '12:34:59', 'query': 'influenza'}, ]))

elements | beam.io.gcp.bigquery.WriteToBigQuery( table='project_name1:dataset_2.query_events_table', additional_bq_parameters=additional_bq_parameters)

Much like the schema case, the parameter with additional_bq_parameters can also take a callable that receives a table reference.

[1] https://cloud.google.com/bigquery/docs/reference/rest/v2/Job #jobconfigurationload [2] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables/insert[3] https://cloud.google.com/bigquery/docs/reference/rest/v2/tables#resource

Chaining of operations after WriteToBigQuery

WriteToBigQuery returns an object with several PCollections that consist of metadata about the write operations. These are useful to inspect the write operation and follow with the results:

schema = {'fields': [ {'name': 'column', 'type': 'STRING', 'mode': 'NULLABLE'}]}

error_schema = {'fields': [ {'name': 'destination', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'row', 'type': 'STRING', 'mode': 'NULLABLE'}, {'name': 'error_message', 'type': 'STRING', 'mode': 'NULLABLE'}]}

with Pipeline() as p: result = (p | 'Create Columns' >> beam.Create([ {'column': 'value'}, {'bad_column': 'bad_value'} ]) | 'Write Data' >> WriteToBigQuery( method=WriteToBigQuery.Method.STREAMING_INSERTS, table=my_table, schema=schema, insert_retry_strategy=RetryStrategy.RETRY_NEVER ))

_ = (result.failed_rows_with_errors | 'Get Errors' >> beam.Map(lambda e: { "destination": e[0], "row": json.dumps(e[1]), "error_message": e[2][0]['message'] }) | 'Write Errors' >> WriteToBigQuery( method=WriteToBigQuery.Method.STREAMING_INSERTS, table=error_log_table, schema=error_schema, ))

Often, the simplest use case is to chain an operation after writing data to BigQuery.To do this, one can chain the operation after one of the output PCollections. A generic way in which this operation (independent of write method) could look like:

def chain_after(result): try: # This works for FILE_LOADS, where we run load and possibly copy jobs. return (result.destination_load_jobid_pairs, result.destination_copy_jobid_pairs) | beam.Flatten() except AttributeError: # Works for STREAMING_INSERTS, where we return the rows BigQuery rejected return result.failed_rows

result = (pcoll | WriteToBigQuery(...))

_ = (chain_after(result) | beam.Reshuffle() # Force a 'commit' of the intermediate date | MyOperationAfterWriteToBQ())

Attributes can be accessed using dot notation or bracket notation:

result.failed_rows <–> result[‘FailedRows’] result.failed_rows_with_errors <–> result[‘FailedRowsWithErrors’] result.destination_load_jobid_pairs <–> result[‘destination_load_jobid_pairs’] result.destination_file_pairs <–> result[‘destination_file_pairs’] result.destination_copy_jobid_pairs <–> result[‘destination_copy_jobid_pairs’]

Writing with Storage Write API using Cross Language

This sink is able to write with BigQuery’s Storage Write API. To do so, specify the method WriteToBigQuery.Method.STORAGE_WRITE_API. This will use the StorageWriteToBigQuery() transform to discover and use the Java implementation. Using this transform directly will require the use of beam.Row() elements.

Similar to streaming inserts, it returns two dead-letter queue PCollections: one containing just the failed rows and the other containing failed rows and errors. They can be accessed with failed_rows and failed_rows_with_errors, respectively. See the examples above for how to do this.

*** Short introduction to BigQuery concepts ***Tables have rows (TableRow) and each row has cells (TableCell). A table has a schema (TableSchema), which in turn describes the schema of each cell (TableFieldSchema). The terms field and cell are used interchangeably.

TableSchema: Describes the schema (types and order) for values in each row.

Has one attribute, ‘field’, which is list of TableFieldSchema objects.

TableFieldSchema: Describes the schema (type, name) for one field.

Has several attributes, including ‘name’ and ‘type’. Common values for the type attribute are: ‘STRING’, ‘INTEGER’, ‘FLOAT’, ‘BOOLEAN’, ‘NUMERIC’, ‘GEOGRAPHY’. All possible values are described at:https://cloud.google.com/bigquery/docs/reference/standard-sql/data-types

TableRow: Holds all values in a table row. Has one attribute, ‘f’, which is a

list of TableCell instances.

TableCell: Holds the value for one cell (or field). Has one attribute,

‘v’, which is a JsonValue instance. This class is defined in apitools.base.py.extra_types.py module.

As of Beam 2.7.0, the NUMERIC data type is supported. This data type supports high-precision decimal numbers (precision of 38 digits, scale of 9 digits). The GEOGRAPHY data type works with Well-Known Text (Seehttps://en.wikipedia.org/wiki/Well-known_text) format for reading and writing to BigQuery. BigQuery IO requires values of BYTES datatype to be encoded using base64 encoding when writing to BigQuery.

Updates to the I/O connector code

For any significant updates to this I/O connector, please consider involving corresponding code reviewers mentioned inhttps://github.com/apache/beam/blob/master/sdks/python/OWNERS

class apache_beam.io.gcp.bigquery.TableRowJsonCoder(table_schema=None)[source]

Bases: Coder

A coder for a TableRow instance to/from a JSON string.

Note that the encoding operation (used when writing to sinks) requires the table schema in order to obtain the ordered list of field names. Reading from sources on the other hand does not need the table schema.

encode(table_row)[source]

decode(encoded_table_row)[source]

class apache_beam.io.gcp.bigquery.BigQueryDisposition[source]

Bases: object

Class holding standard strings used for create and write dispositions.

CREATE_NEVER = 'CREATE_NEVER'

CREATE_IF_NEEDED = 'CREATE_IF_NEEDED'

WRITE_TRUNCATE = 'WRITE_TRUNCATE'

WRITE_APPEND = 'WRITE_APPEND'

WRITE_EMPTY = 'WRITE_EMPTY'

static validate_create(disposition)[source]

static validate_write(disposition)[source]

apache_beam.io.gcp.bigquery.BigQuerySource(table=None, dataset=None, project=None, query=None, validate=False, coder=None, use_standard_sql=False, flatten_results=True, kms_key=None, use_dataflow_native_source=False)[source]

Deprecated since version BigQuerySource: is deprecated since 2.25.0. Use ReadFromBigQuery instead.

apache_beam.io.gcp.bigquery.BigQuerySink(*args, validate=False, **kwargs)[source]

A deprecated alias for WriteToBigQuery.

Deprecated since version BigQuerySink: is deprecated since 2.11.0. Use WriteToBigQuery instead.

class apache_beam.io.gcp.bigquery.BigQueryQueryPriority[source]

Bases: object

Class holding standard strings used for query priority.

INTERACTIVE = 'INTERACTIVE'

BATCH = 'BATCH'

class apache_beam.io.gcp.bigquery.WriteToBigQuery(table, dataset=None, project=None, schema=None, create_disposition='CREATE_IF_NEEDED', write_disposition='WRITE_APPEND', kms_key=None, batch_size=None, max_file_size=None, max_partition_size=None, max_files_per_bundle=None, test_client=None, custom_gcs_temp_location=None, method=None, insert_retry_strategy=None, additional_bq_parameters=None, table_side_inputs=None, schema_side_inputs=None, triggering_frequency=None, use_at_least_once=False, validate=True, temp_file_format=None, ignore_insert_ids=False, with_auto_sharding=False, num_storage_api_streams=0, ignore_unknown_columns=False, load_job_project_id=None, max_insert_payload_size=9437184, num_streaming_keys=500, use_cdc_writes: bool = False, primary_key: List[str] | None = None, expansion_service=None)[source]

Bases: PTransform

Write data to BigQuery.

This transform receives a PCollection of elements to be inserted into BigQuery tables. The elements would come in as Python dictionaries, or as TableRowinstances.

Initialize a WriteToBigQuery transform.

Parameters:

class Method[source]

Bases: object

DEFAULT = 'DEFAULT'

STREAMING_INSERTS = 'STREAMING_INSERTS'

FILE_LOADS = 'FILE_LOADS'

STORAGE_WRITE_API = 'STORAGE_WRITE_API'

static get_table_schema_from_string(schema)

Transform the string table schema into aTableSchema instance.

Parameters:

schema (str) – The string schema to be used if the BigQuery table to write has to be created.

Returns:

The schema to be used if the BigQuery table to write has to be created but in the TableSchema format.

Return type:

TableSchema

static table_schema_to_dict(table_schema)

Create a dictionary representation of table schema for serialization

static get_dict_table_schema(schema)

Transform the table schema into a dictionary instance.

Parameters:

schema (str, dict, TableSchema) – The schema to be used if the BigQuery table to write has to be created. This can either be a dict or string or in the TableSchema format.

Returns:

The schema to be used if the BigQuery table to write has to be created but in the dictionary format.

Return type:

Dict[str, Any]

expand(pcoll)[source]

display_data()[source]

to_runner_api_parameter(context)[source]

from_runner_api(payload, context)[source]

class apache_beam.io.gcp.bigquery.WriteResult(method: str | None = None, destination_load_jobid_pairs: PCollection[Tuple[str, JobReference]] | None = None, destination_file_pairs: PCollection[Tuple[str, Tuple[str, int]]] | None = None, destination_copy_jobid_pairs: PCollection[Tuple[str, JobReference]] | None = None, failed_rows: PCollection[Tuple[str, dict]] | None = None, failed_rows_with_errors: PCollection[Tuple[str, dict, list]] | None = None)[source]

Bases: object

The result of a WriteToBigQuery transform.

validate(valid_methods, attribute)[source]

property destination_load_jobid_pairs_: PCollection[Tuple[str, JobReference]]_

A FILE_LOADS method attribute

Returns: A PCollection of the table destinations that were successfully

loaded to using the batch load API, along with the load job IDs.

Raises: AttributeError: if accessed with a write method besides FILE_LOADS.

property destination_file_pairs_: PCollection[Tuple[str, Tuple[str, int]]]_

A FILE_LOADS method attribute

Returns: A PCollection of the table destinations along with the

temp files used as sources to load from.

Raises: AttributeError: if accessed with a write method besides FILE_LOADS.

property destination_copy_jobid_pairs_: PCollection[Tuple[str, JobReference]]_

A FILE_LOADS method attribute

Returns: A PCollection of the table destinations that were successfully

copied to, along with the copy job ID.

Raises: AttributeError: if accessed with a write method besides FILE_LOADS.

property failed_rows_: PCollection[Tuple[str, dict]]_

A [STREAMING_INSERTS, STORAGE_WRITE_API] method attribute

Returns: A PCollection of rows that failed when inserting to BigQuery.

Raises: AttributeError: if accessed with a write method besides [STREAMING_INSERTS, STORAGE_WRITE_API].

property failed_rows_with_errors_: PCollection[Tuple[str, dict, list]]_

A [STREAMING_INSERTS, STORAGE_WRITE_API] method attribute

Returns:

A PCollection of rows that failed when inserting to BigQuery, along with their errors.

Raises:

class apache_beam.io.gcp.bigquery.ReadFromBigQuery(gcs_location=None, method=None, use_native_datetime=False, output_type=None, *args, **kwargs)[source]

Bases: PTransform

Read data from BigQuery.

This PTransform uses a BigQuery export job to take a snapshot of the table on GCS, and then reads from each produced file. File format is Avro by default.

Parameters:

class Method[source]

Bases: object

EXPORT = 'EXPORT'

DIRECT_READ = 'DIRECT_READ'

COUNTER = 0

expand(pcoll)[source]

class apache_beam.io.gcp.bigquery.ReadFromBigQueryRequest(query: str | None = None, use_standard_sql: bool = True, table: str | TableReference | None = None, flatten_results: bool = False)[source]

Bases: object

Class that defines data to read from BQ.

Only one of query or table should be specified.

Parameters:

validate()[source]

class apache_beam.io.gcp.bigquery.ReadAllFromBigQuery(gcs_location: str | ValueProvider | None = None, validate: bool = False, kms_key: str | None = None, temp_dataset: str | DatasetReference | None = None, bigquery_job_labels: Dict[str, str] | None = None, query_priority: str = 'BATCH')[source]

Bases: PTransform

Read data from BigQuery.

PTransform:ReadFromBigQueryRequest->Rows

This PTransform uses a BigQuery export job to take a snapshot of the table on GCS, and then reads from each produced file. Data is exported into a new subdirectory for each export using UUIDs generated inReadFromBigQueryRequest objects.

It is recommended not to use this PTransform for streaming jobs on GlobalWindow, since it will not be able to cleanup snapshots.

Parameters:

COUNTER = 0

expand(pcoll)[source]