GitHub - akarnokd/RxJavaJdk9Interop: RxJava 2/3 interop library for supporting Java 9 features such as Flow.* (original) (raw)
RxJavaJdk9Interop
RxJava 3 interop library for supporting Java 9 features such as Flow.*
Release
compile 'com.github.akarnokd:rxjava3-jdk9-interop:3.0.0'
Examples
Converting from RxJava 3 to Java 9 Flow
import hu.akarnokd.rxjava3.jdk9interop.*;
Flow.Publisher pub = Flowable.range(1, 5) .to(FlowInterop.toFlow());
// --------
Flow.Processor<Integer, Integer> proc = FlowInterop .toFlowProcessor(PublishProcessor.create());
Converting from Java 9 Flow to RxJava 3
SubmissionPublisher sp = new SubmissionPublisher<>();
Flowable f = FlowInterop .fromFlowPublisher(sp);
// --------
Flow.Processor<Integer, Integer> fp = ...
FlowableProcessor fproc = FlowInterop .fromFlowProcessor(fp);
Note that RxJava 3 FlowableProcessor
s don't support different input and output types therefore the Flow.Processor should have the same type arguments.
For convenience, there is a FlowTestSubscriber
that extends TestSubscriber
and allows asserting on a Flow.Publisher the same way as with Reactive-Streams Publisher types.
FlowTestSubscriber ts = new FlowTestSubscriber<>();
SubmissionPublisher sp = new SubmissionPublisher<>();
sp.subscribe(ts);
sp.onNext(1); sp.onNext(2); sp.onNext(3); sp.onNext(4); sp.onNext(5); sp.close();
ts.awaitDone(5, TimeUnit.SECONDS) // SubmissionPublisher is async by default .assertResult(1, 2, 3, 4, 5);