Stream table updates with change data capture (original) (raw)

Stay organized with collections Save and categorize content based on your preferences.

BigQuery change data capture (CDC) updates your BigQuery tables by processing and applying streamed changes to existing data. This synchronization is accomplished through upsert and delete row operations that are streamed in real time by theBigQuery Storage Write API, which you should be familiar with before proceeding.

Before you begin

Grant Identity and Access Management (IAM) roles that give users the necessary permissions to perform each task in this document, and ensure that your workflow meets each prerequisite.

Required permissions

To get the permission that you need to use the Storage Write API, ask your administrator to grant you theBigQuery Data Editor (roles/bigquery.dataEditor) IAM role. For more information about granting roles, see Manage access to projects, folders, and organizations.

This predefined role contains the bigquery.tables.updateData permission, which is required to use the Storage Write API.

You might also be able to get this permission with custom roles or other predefined roles.

For more information about IAM roles and permissions in BigQuery, seeIntroduction to IAM.

Prerequisites

To use BigQuery CDC, your workflow must meet the following conditions:

Specify changes to existing records

In BigQuery CDC, the pseudocolumn _CHANGE_TYPE indicates the type of change to be processed for each row. To use CDC, set _CHANGE_TYPE when you stream row modifications using the Storage Write API. The pseudocolumn _CHANGE_TYPE only accepts the values UPSERT and DELETE. A table is considered CDC-enabled while the Storage Write API is streaming row modifications to the table in this manner.

Example with UPSERT and DELETE values

Consider the following table in BigQuery:

ID Name Salary
100 Charlie 2000
101 Tal 3000
102 Lee 5000

The following row modifications are streamed by the Storage Write API:

ID Name Salary _CHANGE_TYPE
100 DELETE
101 Tal 8000 UPSERT
105 Izumi 6000 UPSERT

The updated table is now the following:

ID Name Salary
101 Tal 8000
102 Lee 5000
105 Izumi 6000

Manage table staleness

By default, every time you run a query, BigQuery returns the most up-to-date results. To provide the freshest results when querying a CDC-enabled table, BigQuery must apply each streamed row modification up to the query start time, so that the most up-to-date version of the table is being queried. Applying these row modifications at query run time increases query latency and cost. However, if you don't require fully up-to-date query results, you can reduce cost and latency on your queries by setting the max_stalenessoption on your table. When this option is set, BigQuery applies row modifications at least once within the interval defined by themax_staleness value, letting you run queries without waiting for updates to be applied, at the cost of some data staleness.

This behavior is especially useful for dashboards and reports for which data freshness isn't essential. It is also helpful for cost management by giving you more control over how frequently BigQuery applies row modifications.

Query tables with the max_staleness option set

When you query a table with the max_staleness option set, BigQuery returns the result based on the value of max_stalenessand the time at which the last apply job occurred, which is represented by the table's upsert_stream_apply_watermark timestamp.

Consider the following example, in which a table has the max_staleness option set to 10 minutes, and the most recent apply job occurred at T20:

Query run time occurs within the maximum time interval for data staleness.

If you query the table at T25, then the current version of the table is 5 minutes stale, which is less than the max_staleness interval of 10 minutes. In this case, BigQuery returns the version of the table at T20, meaning the data returned is also 5 minutes stale.

When you set the max_staleness option on your table, BigQuery applies pending row modifications at least once within the max_stalenessinterval. In some cases, however, BigQuery might not complete the process of applying these pending row modifications within the interval.

For example, if you query the table at T35, and the process of applying pending row modifications has not completed, then the current version of the table is 15 minutes stale, which is greater than the max_staleness interval of 10 minutes. In this case, at query run time, BigQuery applies all row modifications between T20 and T35 for the current query, meaning the queried data is completely up to date, at the cost of some additional query latency. This is considered a runtime merge job.

Query run time occurs outside of the maximum time interval for data staleness.

A table's max_staleness value should generally be the higher of the following two values:

To calculate the time it takes to apply upserted changes to an existing table, use the following SQL query to determine the 95th percentile duration of background apply jobs, plus a seven-minute buffer to allow for the BigQuery write-optimized storage (streaming buffer) conversion.

SELECT project_id, destination_table.dataset_id, destination_table.table_id, APPROX_QUANTILES((TIMESTAMP_DIFF(end_time, creation_time,MILLISECOND)/1000), 100)[OFFSET(95)] AS p95_background_apply_duration_in_seconds, CEILING(APPROX_QUANTILES((TIMESTAMP_DIFF(end_time, creation_time,MILLISECOND)/1000), 100)[OFFSET(95)]*2/60)+7 AS recommended_max_staleness_with_buffer_in_minutes FROM region-REGION.INFORMATION_SCHEMA.JOBS AS job WHERE project_id = 'PROJECT_ID' AND DATE(creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) AND CURRENT_DATE() AND job_id LIKE "%cdc_background%" GROUP BY 1,2,3;

Replace the following:

The duration of background apply jobs is affected by several factors including the number and complexity of CDC operations issued within the staleness interval, the table size, and BigQuery resource availability. For more information about resource availability, see Size and monitor BACKGROUND reservations.

Create a table with the max_staleness option

To create a table with the max_staleness option, use theCREATE TABLE statement. The following example creates the table employees with a max_staleness limit of 10 minutes:

CREATE TABLE employees ( id INT64 PRIMARY KEY NOT ENFORCED, name STRING) CLUSTER BY id OPTIONS ( max_staleness = INTERVAL 10 MINUTE);

Modify the max_staleness option for an existing table

To add or modify a max_staleness limit in an existing table, use theALTER TABLE statement. The following example changes the max_staleness limit of the employees table to 15 minutes:

ALTER TABLE employees SET OPTIONS ( max_staleness = INTERVAL 15 MINUTE);

Determine the current max_staleness value of a table

To determine the current max_staleness value of a table, query theINFORMATION_SCHEMA.TABLE_OPTIONS view. The following example checks the current max_staleness value of the tablemytable:

SELECT option_name, option_value FROM DATASET_NAME.INFORMATION_SCHEMA.TABLE_OPTIONS WHERE option_name = 'max_staleness' AND table_name = 'TABLE_NAME';

Replace the following:

The results show that the max_staleness value is 10 minutes:

+---------------------+--------------+ | Row | option_name | option_value | +---------------------+--------------+ | 1 | max_staleness | 0-0 0 0:10:0 | +---------------------+--------------+

Monitor table upsert operation progress

To monitor the state of a table and to check when row modifications were last applied, query theINFORMATION_SCHEMA.TABLES viewto get the upsert_stream_apply_watermark timestamp.

The following example checks the upsert_stream_apply_watermark value of the table mytable:

SELECT upsert_stream_apply_watermark FROM DATASET_NAME.INFORMATION_SCHEMA.TABLES WHERE table_name = 'TABLE_NAME';

Replace the following:

The result is similar to the following:

[{ "upsert_stream_apply_watermark": "2022-09-15T04:17:19.909Z" }]

Upsert operations are performed by the bigquery-adminbot@system.gserviceaccount.comservice account and appear within the job history of the project containing the CDC-enabled table.

Manage custom ordering

When streaming upserts to BigQuery, the default behavior of ordering records with identical primary keys is determined by the BigQuery system time at which the record was ingested into BigQuery. In other words, the record most recently ingested with the latest timestamp takes precedence over the record previously ingested with an older timestamp. For certain use cases, such as those where very frequent upserts can occur to the same primary key in a very short time window, or where the upsert order is not guaranteed, this might not be sufficient. For these scenarios, a user-supplied ordering key might be necessary.

To configure user-supplied ordering keys, the pseudocolumn_CHANGE_SEQUENCE_NUMBER is used to indicate the order in which BigQuery should apply records, based on the larger_CHANGE_SEQUENCE_NUMBER between two matching records with the same primary key. The pseudocolumn _CHANGE_SEQUENCE_NUMBER is an optional column and only accepts values in a fixed format STRING.

_CHANGE_SEQUENCE_NUMBER format

The pseudocolumn _CHANGE_SEQUENCE_NUMBER only accepts STRING values, written in a fixed format. This fixed format uses STRING values written in hexadecimal, separated into sections by a forward slash /. Each section can be expressed in at most 16 hexadecimal characters, and up to four sections are allowed per _CHANGE_SEQUENCE_NUMBER. The allowable range of the_CHANGE_SEQUENCE_NUMBER supports values between 0/0/0/0 andFFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF/FFFFFFFFFFFFFFFF._CHANGE_SEQUENCE_NUMBER values support both uppercase and lowercase characters.

Expressing basic ordering keys can be done by using a single section. For example, to order keys solely based on a record's processing timestamp from an application server, you could use one section: '2024-04-30 11:19:44 UTC', expressed as hexadecimal by converting the timestamp to the milliseconds from Epoch, '18F2EBB6480' in this case. The logic to convert data into hexadecimal is the responsibility of the client issuing the write to BigQuery using the Storage Write API.

Supporting multiple sections lets you combine several processing-logic values into one key for more complex use cases. For example, to order keys based on a record's processing timestamp from an application server, a log sequence number, and the record's status, you could use three sections:'2024-04-30 11:19:44 UTC' / '123' / 'complete', each expressed as hexadecimal. The ordering of sections is an important consideration for ranking your processing-logic. BigQuery compares _CHANGE_SEQUENCE_NUMBERvalues by comparing the first section, then comparing the next section only if the previous sections were equal.

BigQuery uses the _CHANGE_SEQUENCE_NUMBER to perform ordering by comparing two or more _CHANGE_SEQUENCE_NUMBER fields as unsigned numeric values.

Consider the following _CHANGE_SEQUENCE_NUMBER comparison examples and their precedence results:

If two _CHANGE_SEQUENCE_NUMBER values are identical, then the record with the latest BigQuery system ingestion time has precedence over previously ingested records.

When custom ordering is used for a table, the _CHANGE_SEQUENCE_NUMBER value should always be supplied. Any write requests that don't specify the_CHANGE_SEQUENCE_NUMBER value, leading to a mix of rows with and without_CHANGE_SEQUENCE_NUMBER values, result in unpredictable ordering.

Configure a BigQuery reservation for use with CDC

You can useBigQuery reservationsto allocate dedicated BigQuery compute resources for CDC row modification operations. Reservations let you set a cap on the cost of performing these operations. This approach is particularly useful for workflows with frequent CDC operations against large tables, which otherwise would have high on-demand costs due to the large number of bytes processed when performing each operation.

BigQuery CDC jobs that apply pending row modifications within the max_staleness interval are considered background jobs and use theBACKGROUND assignment type, rather than the QUERY assignment type. In contrast, queries outside of the max_staleness interval that require row modifications to be applied at query run time use theQUERY assignment type. Tables without a max_staleness setting or tables with max_staleness set to 0 also use the QUERY assignment type. BigQuery CDC background jobs performed without a BACKGROUNDassignment use on-demand pricing. This consideration is important when designing your workload management strategy for BigQuery CDC.

To configure a BigQuery reservation for use with CDC, start byconfiguring a reservationin the region where your BigQuery tables are located. For guidance on the size of your reservation, seeSize and monitor BACKGROUND reservations. Once you have created a reservation,assign the BigQuery project to the reservation, and set the job_type option to BACKGROUND by running the followingCREATE ASSIGNMENT statement:

CREATE ASSIGNMENT ADMIN_PROJECT_ID.region-REGION.RESERVATION_NAME.ASSIGNMENT_ID OPTIONS ( assignee = 'projects/PROJECT_ID', job_type = 'BACKGROUND');

Replace the following:

Size and monitor BACKGROUND reservations

Reservations determine the amount of compute resources available to perform BigQuery compute operations. Undersizing a reservation can increase the processing time of CDC row modification operations. To size a reservation accurately, monitor historical slot consumption for the project that performs the CDC operations by querying theINFORMATION_SCHEMA.JOBS_TIMELINE view:

SELECT period_start, SUM(period_slot_ms) / (1000 * 60) AS slots_used FROM region-REGION.INFORMATION_SCHEMA.JOBS_TIMELINE_BY_PROJECT WHERE DATE(job_creation_time) BETWEEN DATE_SUB(CURRENT_DATE(), INTERVAL 7 DAY) AND CURRENT_DATE() AND job_id LIKE '%cdc_background%' GROUP BY period_start ORDER BY period_start DESC;

Replace REGION with theregion name where your project is located. For example, us.

Deleted data considerations

Limitations

BigQuery CDC pricing

BigQuery CDC uses the Storage Write API for data ingestion, BigQuery storage for data storage, and BigQuery compute for row modification operations, all of which incur costs. For pricing information, seeBigQuery pricing.

Estimate BigQuery CDC costs

In addition togeneral BigQuery cost estimation best practices, estimating the costs of BigQuery CDC might be important for workflows that have large amounts of data, a lowmax_staleness configuration, or frequently changing data.

BigQuery data ingestion pricingand BigQuery storage pricing are directly calculated by the amount of data that you ingest and store, including pseudocolumns. However, BigQuery compute pricingcan be harder to estimate, as it relates to the consumption of compute resources that are used to run BigQuery CDC jobs.

BigQuery CDC jobs are split into three categories:

Only query jobs take advantage ofBigQuery partitioning. Background apply jobs and runtime merge jobs can't use partitioning because, when applying recently streamed row modifications, there is no guarantee to which table partition the recently streamed upserts are applied to. In other words, the full baseline table is read during background apply jobs and runtime merge jobs. For the same reason, only query jobs can benefit from filters on BigQuery clustering columns. Understanding the amount of data that is being read to perform CDC operations is helpful in estimating the total cost.

If the amount of data being read from the table baseline is high, consider using the BigQuerycapacity pricing model, which is not based on the amount of processed data.

BigQuery CDC cost best practices

In addition togeneral BigQuery cost best practices, use the following techniques to optimize the costs of BigQuery CDC operations:

What's next