3.x: Fix groupBy not canceling upstream due to group abandonment by akarnokd · Pull Request #6642 · ReactiveX/RxJava (original) (raw)

This PR fixes the issue when a group is not subscribed to, the upstream may never cancel due to seemingly open groups.

The fix is a tradeoff with group abandonment and possible excessive group re-creation so that elements are not lost in case the groups do get subscribed to a bit later. Therefore, the groups should be subscribed to immediately and synchronously:

Observable.range(1, 1000) .groupBy(v -> v % 10) .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v; }) .subscribe();

Observable.range(1, 1000) .groupBy(v -> v % 10) .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v.observeOn(Schedulers.io()); // <-------------------------------- OK }) .blockingSubscribe();

Consequently, the following setups will result in constant group recreations:

Observable.range(1, 1000) .groupBy(v -> v % 10) .observeOn(Schedulers.io()) // <------------------------------------- TROUBLE .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v; }) .blockingSubscribe();

Observable.range(1, 1000) .groupBy(v -> v % 10) .flatMap(v -> { System.out.println("New group: " + v.getKey()); return v.subscribeOn(Schedulers.io()); // <----------------------- TROUBLE }) .blockingSubscribe();

For the subscribeOn "trouble", since groups were essentially unicast subjects/processors, subscribeOn had no practical use on them and instead observeOn should be used to move the observation of the group's items to the desired thread.

Resolves #6596