3.x: Fix groupBy not canceling upstream due to group abandonment by akarnokd · Pull Request #6642 · ReactiveX/RxJava (original) (raw)
This PR fixes the issue when a group is not subscribed to, the upstream may never cancel due to seemingly open groups.
The fix is a tradeoff with group abandonment and possible excessive group re-creation so that elements are not lost in case the groups do get subscribed to a bit later. Therefore, the groups should be subscribed to immediately and synchronously:
Observable.range(1, 1000) .groupBy(v -> v % 10) .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v; }) .subscribe();
Observable.range(1, 1000) .groupBy(v -> v % 10) .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v.observeOn(Schedulers.io()); // <-------------------------------- OK }) .blockingSubscribe();
Consequently, the following setups will result in constant group recreations:
Observable.range(1, 1000) .groupBy(v -> v % 10) .observeOn(Schedulers.io()) // <------------------------------------- TROUBLE .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v; }) .blockingSubscribe();
Observable.range(1, 1000) .groupBy(v -> v % 10) .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v.subscribeOn(Schedulers.io()); // <----------------------- TROUBLE }) .blockingSubscribe();
For the subscribeOn
"trouble", since groups were essentially unicast subjects/processors, subscribeOn
had no practical use on them and instead observeOn
should be used to move the observation of the group's items to the desired thread.
Resolves #6596