Resumable Full Refresh: Building resilient systems for syncing data (original) (raw)
Moving data from one place to another is a difficult task. As data engineers, we are constantly presented with challenges of data accuracy, data freshness, and cost considerations where there is rarely a perfect solution. Every option has its tradeoffs and it is our job to make the decisions that deliver the most impact and value. As part of our efforts towards Airbyte 1.0, we are proud to announce a new feature within the Airbyte platform and across our connector catalog called Resumable Full Refresh.
Resumable Full Refresh is an improvement to the Full Refresh sync mode to handle transient sync failures in a more efficient and resilient way, not a new sync mode. When using Full Refresh mode with a supported source connector and stream, failing sync job attempts will now be retried starting from the last successful checkpoint on subsequent job attempts. The first attempt of each Full Refresh job will still extract data from the beginning.
In this blog post, we outline how this feature was implemented for both API sources and database sources. We will also delve into the design choices and methodology as we worked to ensure better reliability.
How Airbyte used to perform Full Refresh syncs
Before talking about Resumable Full Refresh, let’s quickly go over the two ways Airbyte can sync data from a source. The Full Refresh sync mode configures a stream to pull every record from the source each time a sync is run. This process does not persist a stream state to indicate which records were previously synced. In contrast to Full Refresh, Incremental syncs also persist a stream state of the last successfully synced record usually in the form of a timestamp, or by using change data capture (CDC) on databases. On subsequent syncs, this timestamp is used to only get newer records and reduce the time needed to complete a sync.
For incremental streams, state checkpointing is a fundamental aspect of how Airbyte extracts data reliably from a source to a destination. These streams periodically emit checkpoint messages over the course of the stream. At the end of the sync, these checkpoints can then be used to start the next sync from that point onward.
The first attempt to sync the contacts_form_submissions failed due to a transient rate limiting error after extracting 7,133 records.
However, streams running in Full Refresh mode were not designed to emit these checkpoint state messages. When an Airbyte sync fails, the sync is retried with multiple attempts before the sync job is marked as successful or failed. Previously, in the case of Full Refresh syncs, each new attempt started extracting data all the way from the beginning. And because Full Refresh syncs did not perform checkpointing, they were even more vulnerable to transient failures. You can imagine how frustrating it must be for a sync to fail in the final hour and start over again in the next attempt. Or worse, for an unreliable data source, a stream may never be able to fully sync the entire data set.
How API sources implement Resumable Full Refresh
Synthetic cursors
In the context of API sources, a cursor is a field in a record that can be used to establish a consistent order of the records in a stream. An example of a good cursor value is a timestamp which indicates the last place the stream synced successfully. And a stream can then use this value to periodically checkpoint its progress. But what if the data being emitted from an API does not have a suitable cursor value? There are many APIs across the internet that don’t support query parameters to filter a sub-collection based on a timestamp. Instead, we are left with a request to get all records and iterate through each page. These types of streams only support syncing in Full Refresh mode.
Syncing the contacts_form_submissions stream is successful on the third attempt by using vidOffset 33674793614 as the starting checkpoint.
In the absence of a cursor, Resumable Full Refresh streams checkpoint using the most recent page of records that was synced. This synthetic cursor is based on the pagination strategy defined by the partner API. And if a sync fails midway through a stream, by utilizing Resumable Full Refresh, the stream will use the last checkpointed page on the subsequent sync attempt.
Examples of synthetic cursors:
- Cursor pagination: A bookmark that points to the next record to be fetched
- Offset: An incrementing counter of records that were previously read
- Page number: The next page of records to retrieve
The ability to define a synthetic cursor that checkpoints according to the stream’s next page is now a new abstraction that is incorporated into the Python airbyte-cdk. With these new concepts implemented, we have begun updating many of our Python source connectors to implement and use this new feature. To see how we implemented resumable Full Refresh for our certified Hubspot source, you can refer to the PR on Github.
Leveraging the power of the low-code framework
For the Python CDK, streams are implemented in many different ways and are not always coded to follow a consistent set of standards. This has created a challenge when incorporating Resumable Full Refresh into streams because they override and deviate from default CDK behavior. Determining what features a stream supports also differs from connector to connector along with varying implementations. Many of the Python sources managed by Airbyte require code changes to start using Resumable Full Refresh.
In 2022, Airbyte released a low code connector development framework that allows developers to define source connector behavior in a declarative YAML file. Connectors built in the low-code framework and the Connector Builder will support Resumable Full Refresh out of the box. In contrast to the Python CDK, low-code connectors are more easily able to uptake this feature because stream behavior is explicitly declared within the connector’s manifest. This has allowed for a consistent pattern to detect a stream’s compatibility with Resumable Full Refresh.
A stream is Resumable Full Refresh compatible if it:
- Has an HttpRequester and defines at least one Paginator and PaginationStrategy
- Does not implement the incremental component
Expanding beyond Resumable Full Refresh, the low-code framework’s ability to easily derive what features a stream supports starts to become a powerful tool. As new horizontal features are introduced across all connectors, the need for common abstractions over arbitrary code to determine compatibility becomes more and more important.
How database sources implement Resumable Full Refresh
Our way of making Full Refresh reads from databases used to be quite naive. We would basically run something similar to:
SELECT id, name, age… FROM some_table
This initial approach was straightforward and worked fast – up to a point. But with ever increasing volumes of data this naive approach could not scale well. In order to be able to make Resumable Full Refresh reads, we turned to the algorithm that is already running successfully in our Incremental syncs.
To recap, an incremental sync first takes a snapshot of the current state of an entire table, before we can start reading incremental changes of that table.
Since the initial snapshot of a table is resumable and scales well to datasets of unlimited sizes, it made sense to apply all these learnings and use this tried and tested method for Full Refresh reads as well.
Large Datasets
For small to medium datasets, it makes sense to query the database using a naive SELECT SQL query and attempt to read the entire dataset in one go. However, this will quickly run into issues with tables that exceed certain size thresholds (depending on which database and in which environment it runs). The more data we try to read at once the greater the chance something can go wrong - think network issues, databases going down for maintenance, a crash of some component along the way. And with a simple SELECT query, all we can do is to start over, which may cause Full Refresh reads to only succeed after a few attempts, if ever. In addition, Making long running queries leads to increased resource consumption on the server - memory, swap files, long running network connections etc., which in turn may lead to more failures.
We solved this problem by changing the query to ensure that we read the data of a table in a defined order, while not incurring any cost in attempting to sort it. This leverages our understanding on how databases save their own data and using that in order to make queries that are as performant as a simple SELECT but as the order in which records are returned is defined, allows us to do two things:
- Break a one long running query into multiple smaller queries reading chunks of no more than 1GB of data each time and by that reducing the level of resources needed.
- Checkpoint along the way, so in case of an error we can continue from a saved point. More on that below.
What this all means is that for example on MySQL databases, the query will look similar to:
SELECT id, name, age… FROM some_table ORDER BY id ASC LIMIT 500000
Where the id column is the primary key of this table, which for MySQL also means the natural order in which data files are saved on the server disk.
We use the equivalent approach on all certified database source connectors, namely Postgres, MS Sql Server and MongoDB. Using this new process for querying data, we ensure that we can read terabytes of data in a timely and resilient manner that can take advantage of periodic checkpointing in the event of an error.
Resumability with checkpointing
In order to be able to resume a read that stopped midway, for whatever reason. We leverage the fact that the data is read in a defined order, and use that order to periodically save our location. In the example above this will be the value of the id column - the primary key of a MySQL table.
This allows us to continue in case of a failure with a value which will manifest into:
SELECT id, name, age… FROM some_table WHERE id>999 ORDER BY id ASC LIMIT 500000
This once again ensures that there is no extra cost incurred by having to re-order the entire dataset in memory.
Reliability vs Isolation
One of the tradeoffs of streams using Resumable Full Refresh is the increased likelihood of emitting duplicate records when a stream fails in the middle of a sync. The next attempt will restart on the checkpoint of the last successful page which might contain both already synced and new records. Historically, we designed Airbyte to ensure at-least-once delivery of data, prioritizing reliability over exactly-once delivery. This design pattern continues to be top of mind with Resumable Full Refresh. Our rationale is that there are many tools within both the Airbyte platform and downstream destinations to sanitize duplicate records. Reliably extracting all data from a source is a table stakes feature for any data infrastructure tool.
Managing complexity between the platform and sources
Using Resumable Full Refresh during the sync requires changes across both the Airbyte platform which is responsible for orchestrating the sync and the source that fetches records and supplies checkpoints. When designing the solution for Resumable Full Refresh, we took careful consideration on the following things.
Platform:
- Retrieve state checkpoint from the database
- Discard state for the first attempt of a sync job
- Transmit state to the source on subsequent sync attempts
- Persist state checkpoint into the database
Source:
- Interpret state checkpoint from the platform and update the stream’s internal state to the incoming value
- Make requests to the partner API or extract records from the database
- Periodically emit state checkpoints to the platform
- Handle and communicate failure reasons
Establishing a clear set of responsibilities and contract between the platform and sources has allowed us to drastically simplify the code flows on both sides. By having sources in charge of writing and interpreting state, the platform treats state checkpoints as a black box that is connector-agnostic. And for source connectors, there is now less conditional logic or room for error when it no longer needs to decide whether to utilize incoming state. The source always interprets state when provided by the platform under the assumption that the platform has already made the right decision.
Eventual consistency through merges
You may be wondering how we can guarantee at least once delivery of records when also using a synthetic cursor that is based on an arbitrary page. What if a new record gets added to a previously requested page? Since a page makes no guarantee on strict ordering compared to a request using explicit timestamps for a subset of data, this scenario is certainly possible.
When designing the Resumable Full Refresh feature, we considered this tradeoff where we are improving resilience to unexpected failure and sacrificing data completeness in the context of a single run. To compensate for this, we have also introduced the new concept of a Refresh sync. To summarize, instead of clearing the destination table at the start of every run, streams configured in Refresh mode will merge the records from the current sync with existing ones in the table from previous syncs. In isolation, a single Resumable Full Refresh sync could have the potential to miss a small number of records, but over the course of multiple syncs, the stream approaches a consistent state where all records reach the target destination.
Final Thoughts
As a part of your infrastructure, how Airbyte syncs data from your sources to your destinations should be set and forget. You trust us to accurately replicate your data and to be resilient against unexpected errors that are non-actionable from you. The overall theme of the upcoming Airbyte 1.0 release is centered around reliability. We hope you’ve learned more about how Airbyte makes your data available and actionable from everywhere now more reliably than ever before.
What other great readings
This article is part of a series of articles we wrote around all the reliability features we released for Airbyte to reach the 1.0 status. Here are some other articles about those reliability features:
- Handling large records
- Checkpointing to ensure uninterrupted data syncs
- Load balancing Airbyte workloads across multiple Kubernetes clusters
- Refreshes to reimport historical data with zero downtime
- Notifications and Webhooks to enable set & forget
- Supporting very large CDC syncs
- Monitoring sync progress and solving OOM failures
- Automatic detection of dropped records
Don't hesitate to check on what was included in Airbyte 1.0's release!