3.x: ConnectableFlowable/ConnetableObservabe redesign by akarnokd · Pull Request #6519 · ReactiveX/RxJava (original) (raw)

This PR changes the connectable API to have a specific reset method to reset a terminated connectable source as part of the official API.

In 2.x, when publish() terminated, it reset itself to a fresh state which could lead to late consumers not receiving events as there might be no one to call connect() again (see #6501). However, replay() did not reset itself, thus late consumers got the cached events, however, a reconnect started the sequence and new consumers may have missed items.

In 3.x, this two corner cases have been fixed by the introduction of reset(). Both publish and replay now remain in their terminated state until reset is called. If the connection is disposed, it will automatically reset their state just like before. The state transitions are as follows:

  1. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> reset() -> fresh
  2. fresh -> connect() -> running -> dispose() -> fresh
  3. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> dispose() -> fresh
  4. fresh -> connect() -> running -> onComplete()/onError() -> terminated -> connect() -> running

This does resolve the race condition with publish().refCount() described in #6501.

In addition, there are some changes to Flowable.publish()'s behavior:

It no longer keeps consuming the upstream if there are no subscribers. This implies if the source terminates while there are unconsumed items in the internal buffer, those will be available for observation.
I have no strong preference on this property and in comparison, Observable.publish drops items because there is no backpressure buffer in its implementation.

Upstream errors are not reported to the RxJavaPlugins.onError handler when if there are no subscribers but have to be observed via a subscriber.
Because terminal events are available until reset now, we can't know really if there is going to be a subscriber or not. However, it might be possible to detect the no-consumer case upon an error and still report it when reset or dispose is called.

Resolves #5628
Resolves #5899