ReactiveX/RxJava (original) (raw)

Brief description

Given a shared upstream and two flows using .observeOn() before .filter() may change the original emission order.

RxJava Version

2.2.13

Code sample

private val subject = PublishSubject.create<Int>().toSerialized()
private val singleScheduler = Schedulers.single()

fun filterSubjectForIdOnScheduler(id: Int): Observable<Int> {
    return subject
        .observeOn(singleScheduler)
        .filter {
            Log.e("filter", "id = <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>i</mi><mi>d</mi><mi>v</mi><mi>a</mi><mi>l</mi><mi>u</mi><mi>e</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">id value = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.6944em;"></span><span class="mord mathnormal">i</span><span class="mord mathnormal">d</span><span class="mord mathnormal" style="margin-right:0.03588em;">v</span><span class="mord mathnormal">a</span><span class="mord mathnormal" style="margin-right:0.01968em;">l</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>it")
            it == id
        }
}

fun main() {
    Observable.merge(
        filterSubjectForIdOnScheduler(1),
        filterSubjectForIdOnScheduler(2)
    )
        .subscribe { Log.e("subscribe", "value = $it") }

    subject.onNext(2)
    subject.onNext(1)
}

Actual (surprising) result

14491-14855 E/filter: id = 1 value = 2
14491-14855 E/filter: id = 1 value = 1
14491-14855 E/subscribe: value = 1
14491-14855 E/filter: id = 2 value = 2
14491-14855 E/subscribe: value = 2
14491-14855 E/filter: id = 2 value = 1

Expected result

14491-14855 E/filter: id = 1 value = 2
14491-14855 E/filter: id = 2 value = 2
14491-14855 E/subscribe: value = 2
14491-14855 E/filter: id = 1 value = 1
14491-14855 E/subscribe: value = 1
14491-14855 E/filter: id = 2 value = 1

Notes

  1. The code originally used a Schedulers.from(Executors.newSingleThreadExecutor()) — I thought that somehow the executors may be optimised in a way that batches individual runnables and cycles them before cycling the queue of observers. I found Schedulers.single() which states (emphasis mine):
* Returns a default, shared, single-thread-backed {@link Scheduler} instance for work
* requiring >>>strongly-sequential<<< execution on the same background thread.
  1. Instead of using .observeOn() calling via scheduleDirect yields expected results:
singleScheduler.scheduleDirect { subject.onNext(2) }
singleScheduler.scheduleDirect { subject.onNext(1) }
// or
singleScheduler.scheduleDirect {
    subject.onNext(2)
    subject.onNext(1)
}
  1. It seems that the scheduler has two queues. One for emissions and one for observers. The emissions queue seems to be cycled before the queue of observers where sequential processing seemingly would need different priority of cycling (notify all observers about the first emission before proceeding to a next one).

Question

Is the actual result an expected one?
If so — could you explain why? Is it possible to alter the flow without reordering operators and achieve expected results?