GitHub - akarnokd/RxJavaJdk8Interop at master (original) (raw)
RxJavaJdk8Interop
RxJava 2 interop library for supporting Java 8 features such as Optional, Stream and CompletableFuture.
Release
compile 'com.github.akarnokd:rxjava2-jdk8-interop:0.3.7'
Examples
The main entry points are:
FlowableInterop
ObservableInterop
SingleInterop
MaybeInterop
CompletableInterop
Stream to RxJava
Note that java.util.stream.Stream
can be consumed at most once and only synchronously.
Stream stream = ...
Flowable flow = FlowableInterop.fromStream(stream);
Observable obs = ObservableInterop.fromStream(stream);
Optional to RxJava
Optional opt = ...
Flowable flow = FlowableInterop.fromOptional(opt);
Observable obs = ObservableInterop.fromOptional(opt);
CompletionStage to RxJava
Note that cancelling the Subscription won't cancel the CompletionStage
.
CompletionStage cs = ...
Flowable flow = FlowableInterop.fromFuture(cs);
Observable flow = ObservableInterop.fromFuture(cs);
Using Stream Collectors
Flowable.range(1, 10) .compose(FlowableInterop.collect(Collectors.toList())) .test() .assertResult(Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10));
Return the first/single/last element as a CompletionStage
CompletionStage cs = Flowable.just(1) .delay(1, TimeUnit.SECONDS) // return first .to(FlowableInterop.first());
// return single // .to(FlowableInterop.single());
// return last // .to(FlowableInterop.last());
cs.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); });
Return the only element as a CompletionStage
Single
CompletionStage cs = Single.just(1) .delay(1, TimeUnit.SECONDS) .to(SingleInterop.get());
cs.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); });
Maybe
CompletionStage cs = Maybe.just(1) .delay(1, TimeUnit.SECONDS) .to(MaybeInterop.get());
cs.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); });
Await completion as CompletionStage
Completable
CompletionStage cs = Completable.complete() .delay(1, TimeUnit.SECONDS) .to(CompletableInterop.await());
cs.whenComplete((v, e) -> { System.out.println(v); System.out.println(e); });
Return the first/last element optionally
This is a blocking operation
Optional opt = Flowable.just(1) .to(FlowableInterop.firstElement());
System.out.println(opt.map(v -> v + 1).orElse(-1));
Convert to Java Stream
This is a blocking operation. Closing the stream will cancel the RxJava sequence.
Flowable.range(1, 10) .to(FlowableInterop.toStream()) .parallel() .map(v -> v + 1) .forEach(System.out::println);
FlatMap Java Streams
Note that since consuming a stream is practically blocking, there is no need for a maxConcurrency
parameter.
Flowable.range(1, 5) .compose(FlowableInterop.flatMapStream(v -> Arrays.asList(v, v + 1).stream())) .test() .assertResult(1, 2, 2, 3, 3, 4, 4, 5, 5, 6);
Map based on Java Optional
Flowable.range(1, 5) .compose(FlowableInterop.mapOptional(v -> v % 2 == 0 ? Optional.of(v) : Optional.empty())) .test() .assertResult(2, 4);