Schedulers (RxJava Javadoc 3.1.10) (original) (raw)
Wraps an Executor into a new Scheduler instance and delegates schedule()
calls to it.
The tasks scheduled by the returned Scheduler
and its Scheduler.Worker can be optionally interrupted.
If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled with a time delay or periodically will use thesingle() scheduler for the timed waiting before posting the actual task to the given executor.
If the provided executor supports the standard Java ExecutorService API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use the single() scheduler for the timed waiting before posting the actual task to the given executor.
If the provided executor supports the standard Java ScheduledExecutorService API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorService
instance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.
The implementation of the Worker of this wrapper Scheduler
can operate in both eager (non-fair) and fair modes depending on the specified parameter. In eager mode, it will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backing Executor
. In other terms, it does not allow per-Runnable fairness in case the worker runs on a shared underlying thread of the Executor
. In fair mode, non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task, the execution for the next task is rescheduled with the same underlying Executor
, allowing interleaving from both the same Scheduler
or other external usages of the underlying Executor
.
Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:
ExecutorService exec = Executors.newSingleThreadedExecutor();
try {
Scheduler scheduler = Schedulers.from(exec, true, true);
Flowable.just(1)
.subscribeOn(scheduler)
.map(v -> v + 1)
.observeOn(scheduler)
.blockingSubscribe(System.out::println);
} finally {
exec.shutdown();
}
Note that the provided Executor
should avoid throwing a RejectedExecutionException (for example, by shutting it down prematurely or using a bounded-queue ExecutorService
) because such circumstances prevent RxJava from progressing flow-related activities correctly. If the Executor.execute(Runnable) or ExecutorService.submit(Callable) throws, the RejectedExecutionException
is routed to the global error handler viaRxJavaPlugins.onError(Throwable). To avoid shutdown-related problems, it is recommended all flows using the returned Scheduler
to be canceled/disposed before the underlyingExecutor
is shut down. To avoid problems due to the Executor
having a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.
This type of scheduler is less sensitive to leaking Scheduler.Worker instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".
Note that this method returns a new Scheduler
instance, even for the same Executor
instance.
It is possible to wrap an Executor
into a Scheduler
without triggering the initialization of all the standard schedulers by using the RxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean) method before the Schedulers
class itself is accessed.