2.x: Fix refCount termination-reconnect race by akarnokd · Pull Request #6187 · ReactiveX/RxJava (original) (raw)

@akarnokd This seems to fix the issue for .replay().refCount(), but I'm seeing the same test fail intermittently with .publish().refCount() (aka share()):

    @Test
    fun `test refcount race conditions`() {
        for (i in 0 until 10_000) {
            val observable = Observable.just(1).publish().refCount()

            val observer1 = observable
                .subscribeOn(Schedulers.io())
                .test()

            val observer2 = observable
                .subscribeOn(Schedulers.io())
                .test()

            observer1
                .withTag("observer1 $i")
                .awaitDone(5, TimeUnit.SECONDS)
                .assertResult(1)

            observer2
                .withTag("observer2 $i")
                .awaitDone(5, TimeUnit.SECONDS)
                .assertResult(1)
        }
    }

The actual failure is obviously unpredictable due to the race conditions, but it usually fails pretty early. In this case, it failed at i=14:

java.lang.AssertionError: Value count differs; expected: 1 [1] but was: 0 [] (latch = 1, values = 0, errors = 0, completions = 0, timeout!, disposed!, tag = observer2 14)
Expected :1 [1] 
Actual   :0 [] (latch = 1, values = 0, errors = 0, completions = 0, timeout!, disposed!, tag = observer2 14)