3.x: groupBy may never cancel the upstream if the group is dropped · Issue #6596 · ReactiveX/RxJava (original) (raw)
Summary
The design decision to allow a delayed subscribe()
to a group emitted by groupBy
(i.e., subscribeOn
) creates a window where if the consumer ignores the group, groupBy
may never cancel its source.
Problem
In order to support taking a limited number of groups (i.e., source.groupBy(i -> i % 10).take(2)
, the groupBy
operator can't cancel its source just because the downstream cancelled the flow of groups on it. Instead, a reference counting scheme is used so that when all groups have been cancelled, the upstream will then be cancelled (i.e., source.groupBy(i -> i % 2).take(2).flatMap(g -> g.take(2))
).
The documentation states that one should not ignore a group:
Note: A
GroupedFlowable
will cache the items it is to emit until such time as it is subscribed to. For this reason, in order to avoid memory leaks, you should not simply ignore thoseGroupedPublisher
s that do not concern you. Instead, you can signal to them that they may discard their buffers by applying an operator likeignoreElements()
to them.
however, some operators may ignore their input - since they don't see a group but an object only, thus the group is not consumed and the source is never cancelled.
Unfortunately, flatMap
is one of such operators which when cancelled, it will ignore incoming values, and thus groups. Add some asynchronous cancellation race and the problem manifests sooner or later (source.groupBy.take.flatMap.takeUnit(cancelSignal)
).
In the original Rx.NET implementation, groups are reference counted and if the consumer doesn't subscribe to it immediately, the group is discarded right then. However, they don't cache items and thus a delayed subscription results in dataloss RxJava was set to avoid.
Proposition
I propose a change to the groupBy
logic to solve this cancellation problem as well as not lose data. The solution requires multiple considerations.
First, we need to detect if there was a subscribe
call when a fresh group was emitted. If not, the group is discarded after the single value and a completion signal is emitted to the group. This way, the groupBy
is not held back by a potentially unconsumed group and if the group is eventually consumed, the value is not lost.
The drawback is that this scheme may lead to group recreation over and over even if the group is actually subscribed to in a delayed fashion. Note, however, that since a group is practically a hot subject, using subscribeOn
has generally no practical benefit and consumers should apply observeOn
to shift the emission to the desired thread anyway.
Second, there is an inherent race possible between an async subscriber and deciding if the group has been subscribed to just in time. Therefore, an atomic state transition has to be implemented to declare a group live or dead on arrival. In addition, the completion of the dead group and a possible cancellation by its consumer should not trigger multiple cancellations/group removal, especially the removal of a newer group with the same key.
Third, when a GroupedFlowable
is declared dead on arrival, then consumed later, the consumption if the cached item should not trigger a request from the main source like with any alive group consumption. In contrast, declaring a group dead should ask for replenishment from the main source as now we can't know if the group will ever be consumed.