Schedulers (RxJava Javadoc 3.1.10) (original) (raw)

Wraps an Executor into a new Scheduler instance and delegates schedule() calls to it.

The tasks scheduled by the returned Scheduler and its Scheduler.Worker can be optionally interrupted.

If the provided executor doesn't support any of the more specific standard Java executor APIs, tasks scheduled with a time delay or periodically will use thesingle() scheduler for the timed waiting before posting the actual task to the given executor.

If the provided executor supports the standard Java ExecutorService API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use the single() scheduler for the timed waiting before posting the actual task to the given executor.

If the provided executor supports the standard Java ScheduledExecutorService API, tasks scheduled by this scheduler can be cancelled/interrupted by callingDisposable.dispose(). In addition, tasks scheduled with a time delay or periodically will use the provided executor. Note, however, if the providedScheduledExecutorService instance is not single threaded, tasks scheduled with a time delay close to each other may end up executing in different order than the original schedule() call was issued. This limitation may be lifted in a future patch.

The implementation of the Worker of this wrapper Scheduler can operate in both eager (non-fair) and fair modes depending on the specified parameter. In eager mode, it will execute as many non-delayed tasks as it can, which may result in a longer than expected occupation of a thread of the given backing Executor. In other terms, it does not allow per-Runnable fairness in case the worker runs on a shared underlying thread of the Executor. In fair mode, non-delayed tasks will still be executed in a FIFO and non-overlapping manner, but after each task, the execution for the next task is rescheduled with the same underlying Executor, allowing interleaving from both the same Scheduler or other external usages of the underlying Executor.

Starting, stopping and restarting this scheduler is not supported (no-op) and the provided executor's lifecycle must be managed externally:


 ExecutorService exec = Executors.newSingleThreadedExecutor();
 try {
     Scheduler scheduler = Schedulers.from(exec, true, true);
     Flowable.just(1)
        .subscribeOn(scheduler)
        .map(v -> v + 1)
        .observeOn(scheduler)
        .blockingSubscribe(System.out::println);
 } finally {
     exec.shutdown();
 }
 

Note that the provided Executor should avoid throwing a RejectedExecutionException (for example, by shutting it down prematurely or using a bounded-queue ExecutorService) because such circumstances prevent RxJava from progressing flow-related activities correctly. If the Executor.execute(Runnable) or ExecutorService.submit(Callable) throws, the RejectedExecutionException is routed to the global error handler viaRxJavaPlugins.onError(Throwable). To avoid shutdown-related problems, it is recommended all flows using the returned Scheduler to be canceled/disposed before the underlyingExecutor is shut down. To avoid problems due to the Executor having a bounded-queue, it is recommended to rephrase the flow to utilize backpressure as the means to limit outstanding work.

This type of scheduler is less sensitive to leaking Scheduler.Worker instances, although not disposing a worker that has timed/delayed tasks not cancelled by other means may leak resources and/or execute those tasks "unexpectedly".

Note that this method returns a new Scheduler instance, even for the same Executor instance.

It is possible to wrap an Executor into a Scheduler without triggering the initialization of all the standard schedulers by using the RxJavaPlugins.createExecutorScheduler(Executor, boolean, boolean) method before the Schedulers class itself is accessed.