ReactiveX/RxJava (original) (raw)
Brief description
Given a shared upstream and two flows using .observeOn()
before .filter()
may change the original emission order.
RxJava Version
2.2.13
Code sample
private val subject = PublishSubject.create<Int>().toSerialized()
private val singleScheduler = Schedulers.single()
fun filterSubjectForIdOnScheduler(id: Int): Observable<Int> {
return subject
.observeOn(singleScheduler)
.filter {
Log.e("filter", "id = <span class="katex"><span class="katex-mathml"><math xmlns="http://www.w3.org/1998/Math/MathML"><semantics><mrow><mi>i</mi><mi>d</mi><mi>v</mi><mi>a</mi><mi>l</mi><mi>u</mi><mi>e</mi><mo>=</mo></mrow><annotation encoding="application/x-tex">id value = </annotation></semantics></math></span><span class="katex-html" aria-hidden="true"><span class="base"><span class="strut" style="height:0.6944em;"></span><span class="mord mathnormal">i</span><span class="mord mathnormal">d</span><span class="mord mathnormal" style="margin-right:0.03588em;">v</span><span class="mord mathnormal">a</span><span class="mord mathnormal" style="margin-right:0.01968em;">l</span><span class="mord mathnormal">u</span><span class="mord mathnormal">e</span><span class="mspace" style="margin-right:0.2778em;"></span><span class="mrel">=</span></span></span></span>it")
it == id
}
}
fun main() {
Observable.merge(
filterSubjectForIdOnScheduler(1),
filterSubjectForIdOnScheduler(2)
)
.subscribe { Log.e("subscribe", "value = $it") }
subject.onNext(2)
subject.onNext(1)
}
Actual (surprising) result
14491-14855 E/filter: id = 1 value = 2
14491-14855 E/filter: id = 1 value = 1
14491-14855 E/subscribe: value = 1
14491-14855 E/filter: id = 2 value = 2
14491-14855 E/subscribe: value = 2
14491-14855 E/filter: id = 2 value = 1
Expected result
14491-14855 E/filter: id = 1 value = 2
14491-14855 E/filter: id = 2 value = 2
14491-14855 E/subscribe: value = 2
14491-14855 E/filter: id = 1 value = 1
14491-14855 E/subscribe: value = 1
14491-14855 E/filter: id = 2 value = 1
Notes
- The code originally used a
Schedulers.from(Executors.newSingleThreadExecutor())
— I thought that somehow the executors may be optimised in a way that batches individual runnables and cycles them before cycling the queue of observers. I foundSchedulers.single()
which states (emphasis mine):
* Returns a default, shared, single-thread-backed {@link Scheduler} instance for work * requiring >>>strongly-sequential<<< execution on the same background thread.
- Instead of using
.observeOn()
calling viascheduleDirect
yields expected results:
singleScheduler.scheduleDirect { subject.onNext(2) }
singleScheduler.scheduleDirect { subject.onNext(1) }
// or
singleScheduler.scheduleDirect {
subject.onNext(2)
subject.onNext(1)
}
- It seems that the scheduler has two queues. One for emissions and one for observers. The emissions queue seems to be cycled before the queue of observers where sequential processing seemingly would need different priority of cycling (notify all observers about the first emission before proceeding to a next one).
Question
Is the actual result an expected one?
If so — could you explain why? Is it possible to alter the flow without reordering operators and achieve expected results?