Async and Reactive APIs | Couchbase Docs (original) (raw)
The SDK itself does not provide explicit APIs for batching, because using the reactive mechanisms it allows you to build batching code applied to your use case much better than a generic implementation could in the first place.
While it can be done with the async API as well, we recommend using the reactive API so you can use async retry and fallback mechanisms that are supplied out of the box. The most simplistic bulk fetch (without error handling or anything) looks like this:
List<String> docsToFetch = Arrays.asList("airline_10123", "airline_10226", "airline_10642");
List<GetResult> results = Flux.fromIterable(docsToFetch).flatMap(reactiveCollection::get).collectList().block();
This code grabs a list of keys to fetch and passes them to ReactiveCollection#get(String)
. Since this is happening asynchronously, the results will return in whatever order they come back from the server cluster. The block()
at the end waits until all results have been collected. Of course the blocking part at the end is optional, but it shows that you can mix and match reactive and blocking code to on the one hand benefit from simplicity, but always go one layer below for the more powerful concepts if needed.
While being simple, the code as shown has one big downside: individual errors for each document will fail the whole stream (this is how the Flux
semantics are specified). In some cases this might be what you want, but most of the time you either want to ignore individual failures or mark them as failed.
Here is how you can ignore individual errors:
List<String> docsToFetch = Arrays.asList("airline_10748", "airline_10765", "airline_109");
List<GetResult> results = Flux.fromIterable(docsToFetch)
.flatMap(key -> reactiveCollection.get(key).onErrorResume(e -> Mono.empty())).collectList().block();
The .onErrorResume(e → Mono.empty()))
returns an empty Mono
regardless of the error. Since you have the exception in scope, you can also decide based on the actual error if you want to ignore it or propagate/fallback to a different reactive computation.
If you want to separate out failures from completions, one way would be to use side effects. This is not as clean as with pure functional programming but does the job as well. Make sure to use concurrent data structures for proper thread safety:
List<String> docsToFetch = Arrays.asList("airline_112", "airline_1191", "airline_1203");
List<GetResult> successfulResults = Collections.synchronizedList(new ArrayList<>());
Map<String, Throwable> erroredResults = new ConcurrentHashMap<>();
Flux.fromIterable(docsToFetch).flatMap(key -> reactiveCollection.get(key).onErrorResume(e -> {
erroredResults.put(key, e);
return Mono.empty();
})).doOnNext(successfulResults::add).last().block();
If the result succeeds the side-effect method doOnNext
is used to store it into the successfulResults
and if the operation fails we are utilizing the same operator as before (onErrorResume
) to store it in the erroredResults
map — but then also to ignore it for the overall sequence.
Finally, it is also possible to retry individual failures before giving up. The built-in retry mechanisms help with this:
List<String> docsToFetch = Arrays.asList("airline_1316", "airline_13391", "airline_1355");
List<GetResult> results = Flux.fromIterable(docsToFetch)
.flatMap(key -> reactiveCollection.get(key).retryWhen(Retry.backoff(10, Duration.ofMillis(10)))).collectList()
.block();
It is recommended to check out the retry
and retryBackoff
methods for their configuration options and overloads. Of course, all the operators shown here can be combined to achieve exactly the semantics you need. Finally, for even advanced retry policies you can utilize the retry functionality in the reactor-extra package.