3.x: Change Flowable.groupBy to signal MBE instead of possibly hanging by akarnokd · Pull Request #6740 · ReactiveX/RxJava (original) (raw)
This PR changes the backpressure behavior of Flowable.groupBy
to signal MissingBackpressureException
instead of silently hanging if the produced groups are not ready to be accepted by the downstream.
This can happen if one flatMap
s a groupBy
but there are more groups produced than the concurrency level of flatMap
. Since replenishment is tied to item consumption from the groups, not consuming them can result in none of the groups receiving any further items and the whole operator hangs.
The following changes have been applied:
- Removed the queue from the main operator since it will now try to emit directly and not buffer groups.
- The main
Flowable
, lacking a queue, no longer supports operator fusion. Tests checking this property have been removed as well. - When a group is drained, consumed items are replenished in batch if possible. Detecting a cancellation will also trigger a replenishment.
- When a group is pulled (fusion mode), now all
pull
,isEmpty
andclear
will trigger replenishment so that other groups can make progress too. - Unit tests have been modified to have large enough bufferSize/prefetch amounts to allow them to pass.
Fixes #6641