SubmissionPublisher (Java SE 9 & JDK 9 ) (original) (raw)
- Type Parameters:
T
- the published item type
All Implemented Interfaces:[AutoCloseable](../../../java/lang/AutoCloseable.html "interface in java.lang")
,[Flow.Publisher](../../../java/util/concurrent/Flow.Publisher.html "interface in java.util.concurrent")<T>
public class SubmissionPublisher
extends Object
implements Flow.Publisher, AutoCloseable
A Flow.Publisher that asynchronously issues submitted (non-null) items to current subscribers until it is closed. Each current subscriber receives newly submitted items in the same order unless drops or exceptions are encountered. Using a SubmissionPublisher allows item generators to act as compliant reactive-streams Publishers relying on drop handling and/or blocking for flow control.
A SubmissionPublisher uses the Executor supplied in its constructor for delivery to subscribers. The best choice of Executor depends on expected usage. If the generator(s) of submitted items run in separate threads, and the number of subscribers can be estimated, consider using a Executors.newFixedThreadPool(int). Otherwise consider using the default, normally the ForkJoinPool.commonPool().
Buffering allows producers and consumers to transiently operate at different rates. Each subscriber uses an independent buffer. Buffers are created upon first use and expanded as needed up to the given maximum. (The enforced capacity may be rounded up to the nearest power of two and/or bounded by the largest value supported by this implementation.) Invocations of request do not directly result in buffer expansion, but risk saturation if unfilled requests exceed the maximum capacity. The default value of Flow.defaultBufferSize() may provide a useful starting point for choosing a capacity based on expected rates, resources, and usages.
Publication methods support different policies about what to do when buffers are saturated. Method submit blocks until resources are available. This is simplest, but least responsive. The offer
methods may drop items (either immediately or with bounded timeout), but provide an opportunity to interpose a handler and then retry.
If any Subscriber method throws an exception, its subscription is cancelled. If a handler is supplied as a constructor argument, it is invoked before cancellation upon an exception in methodonNext, but exceptions in methodsonSubscribe,onError andonComplete are not recorded or handled before cancellation. If the supplied Executor throwsRejectedExecutionException (or any other RuntimeException or Error) when attempting to execute a task, or a drop handler throws an exception when processing a dropped item, then the exception is rethrown. In these cases, not all subscribers will have been issued the published item. It is usually good practice tocloseExceptionally in these cases.
Method consume(Consumer) simplifies support for a common case in which the only action of a subscriber is to request and process all items using a supplied function.
This class may also serve as a convenient base for subclasses that generate items, and use the methods in this class to publish them. For example here is a class that periodically publishes the items generated from a supplier. (In practice you might add methods to independently start and stop generation, to share Executors among publishers, and so on, or use a SubmissionPublisher as a component rather than a superclass.)
class PeriodicPublisher<T> extends SubmissionPublisher<T> { final ScheduledFuture<?> periodicTask; final ScheduledExecutorService scheduler; PeriodicPublisher(Executor executor, int maxBufferCapacity, Supplier<? extends T> supplier, long period, TimeUnit unit) { super(executor, maxBufferCapacity); scheduler = new ScheduledThreadPoolExecutor(1); periodicTask = scheduler.scheduleAtFixedRate( () -> submit(supplier.get()), 0, period, unit); } public void close() { periodicTask.cancel(false); scheduler.shutdown(); super.close(); } }
Here is an example of a Flow.Processor implementation. It uses single-step requests to its publisher for simplicity of illustration. A more adaptive version could monitor flow using the lag estimate returned from submit
, along with other utility methods.
class TransformProcessor<S,T> extends SubmissionPublisher<T> implements Flow.Processor<S,T> { final Function<? super S, ? extends T> function; Flow.Subscription subscription; TransformProcessor(Executor executor, int maxBufferCapacity, Function<? super S, ? extends T> function) { super(executor, maxBufferCapacity); this.function = function; } public void onSubscribe(Flow.Subscription subscription) { (this.subscription = subscription).request(1); } public void onNext(S item) { subscription.request(1); submit(function.apply(item)); } public void onError(Throwable ex) { closeExceptionally(ex); } public void onComplete() { close(); } }
Since:
9
Constructor Summary
Constructors
Constructor Description SubmissionPublisher() Creates a new SubmissionPublisher using the ForkJoinPool.commonPool() for async delivery to subscribers (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task), with maximum buffer capacity of Flow.defaultBufferSize(), and no handler for Subscriber exceptions in method onNext. SubmissionPublisher(Executor executor, int maxBufferCapacity) Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and no handler for Subscriber exceptions in method onNext. SubmissionPublisher(Executor executor, int maxBufferCapacity,BiConsumer<? super Flow.Subscriber<? super T>,? super Throwable> handler) Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and, if non-null, the given handler invoked when any Subscriber throws an exception in method onNext. Method Summary
All Methods Instance Methods Concrete Methods
Modifier and Type Method Description void close() Unless already closed, issues onComplete signals to current subscribers, and disallows subsequent attempts to publish. void closeExceptionally(Throwable error) Unless already closed, issues onError signals to current subscribers with the given error, and disallows subsequent attempts to publish. CompletableFuture<Void> consume(Consumer<? super T> consumer) Processes all published items using the given Consumer function. int estimateMaximumLag() Returns an estimate of the maximum number of items produced but not yet consumed among all current subscribers. long estimateMinimumDemand() Returns an estimate of the minimum number of items requested (via request) but not yet produced, among all current subscribers. Throwable getClosedException() Returns the exception associated with closeExceptionally, or null if not closed or if closed normally. Executor getExecutor() Returns the Executor used for asynchronous delivery. int getMaxBufferCapacity() Returns the maximum per-subscriber buffer capacity. int getNumberOfSubscribers() Returns the number of current subscribers. List<Flow.Subscriber<? super T>> getSubscribers() Returns a list of current subscribers for monitoring and tracking purposes, not for invoking Flow.Subscriber methods on the subscribers. boolean hasSubscribers() Returns true if this publisher has any subscribers. boolean isClosed() Returns true if this publisher is not accepting submissions. boolean isSubscribed(Flow.Subscriber<? super T> subscriber) Returns true if the given Subscriber is currently subscribed. int offer(T item, long timeout,TimeUnit unit,BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method, blocking while resources for any subscription are unavailable, up to the specified timeout or until the caller thread is interrupted, at which point the given handler (if non-null) is invoked, and if it returns true, retried once. int offer(T item,BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop) Publishes the given item, if possible, to each current subscriber by asynchronously invoking its onNext method. int submit(T item) Publishes the given item to each current subscriber by asynchronously invoking its onNext method, blocking uninterruptibly while resources for any subscriber are unavailable. void subscribe(Flow.Subscriber<? super T> subscriber) Adds the given Subscriber unless already subscribed. * ### Methods inherited from class java.lang.[Object](../../../java/lang/Object.html "class in java.lang") `[clone](../../../java/lang/Object.html#clone--), [equals](../../../java/lang/Object.html#equals-java.lang.Object-), [finalize](../../../java/lang/Object.html#finalize--), [getClass](../../../java/lang/Object.html#getClass--), [hashCode](../../../java/lang/Object.html#hashCode--), [notify](../../../java/lang/Object.html#notify--), [notifyAll](../../../java/lang/Object.html#notifyAll--), [toString](../../../java/lang/Object.html#toString--), [wait](../../../java/lang/Object.html#wait--), [wait](../../../java/lang/Object.html#wait-long-), [wait](../../../java/lang/Object.html#wait-long-int-)`
Constructor Detail
* #### SubmissionPublisher public SubmissionPublisher([Executor](../../../java/util/concurrent/Executor.html "interface in java.util.concurrent") executor, int maxBufferCapacity, [BiConsumer](../../../java/util/function/BiConsumer.html "interface in java.util.function")<? super [Flow.Subscriber](../../../java/util/concurrent/Flow.Subscriber.html "interface in java.util.concurrent")<? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")>,? super [Throwable](../../../java/lang/Throwable.html "class in java.lang")> handler) Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and, if non-null, the given handler invoked when any Subscriber throws an exception in method [onNext](../../../java/util/concurrent/Flow.Subscriber.html#onNext-T-). Parameters: `executor` \- the executor to use for async delivery, supporting creation of at least one independent thread `maxBufferCapacity` \- the maximum capacity for each subscriber's buffer (the enforced capacity may be rounded up to the nearest power of two and/or bounded by the largest value supported by this implementation; method [getMaxBufferCapacity()](../../../java/util/concurrent/SubmissionPublisher.html#getMaxBufferCapacity--) returns the actual value) `handler` \- if non-null, procedure to invoke upon exception thrown in method `onNext` Throws: `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if executor is null `[IllegalArgumentException](../../../java/lang/IllegalArgumentException.html "class in java.lang")` \- if maxBufferCapacity not positive * #### SubmissionPublisher public SubmissionPublisher([Executor](../../../java/util/concurrent/Executor.html "interface in java.util.concurrent") executor, int maxBufferCapacity) Creates a new SubmissionPublisher using the given Executor for async delivery to subscribers, with the given maximum buffer size for each subscriber, and no handler for Subscriber exceptions in method [onNext](../../../java/util/concurrent/Flow.Subscriber.html#onNext-T-). Parameters: `executor` \- the executor to use for async delivery, supporting creation of at least one independent thread `maxBufferCapacity` \- the maximum capacity for each subscriber's buffer (the enforced capacity may be rounded up to the nearest power of two and/or bounded by the largest value supported by this implementation; method [getMaxBufferCapacity()](../../../java/util/concurrent/SubmissionPublisher.html#getMaxBufferCapacity--) returns the actual value) Throws: `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if executor is null `[IllegalArgumentException](../../../java/lang/IllegalArgumentException.html "class in java.lang")` \- if maxBufferCapacity not positive * #### SubmissionPublisher public SubmissionPublisher() Creates a new SubmissionPublisher using the [ForkJoinPool.commonPool()](../../../java/util/concurrent/ForkJoinPool.html#commonPool--) for async delivery to subscribers (unless it does not support a parallelism level of at least two, in which case, a new Thread is created to run each task), with maximum buffer capacity of [Flow.defaultBufferSize()](../../../java/util/concurrent/Flow.html#defaultBufferSize--), and no handler for Subscriber exceptions in method [onNext](../../../java/util/concurrent/Flow.Subscriber.html#onNext-T-).
Method Detail
* #### subscribe public void subscribe([Flow.Subscriber](../../../java/util/concurrent/Flow.Subscriber.html "interface in java.util.concurrent")<? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")> subscriber) Adds the given Subscriber unless already subscribed. If already subscribed, the Subscriber's [onError](../../../java/util/concurrent/Flow.Subscriber.html#onError-java.lang.Throwable-) method is invoked on the existing subscription with an [IllegalStateException](../../../java/lang/IllegalStateException.html "class in java.lang"). Otherwise, upon success, the Subscriber's [onSubscribe](../../../java/util/concurrent/Flow.Subscriber.html#onSubscribe-java.util.concurrent.Flow.Subscription-) method is invoked asynchronously with a new [Flow.Subscription](../../../java/util/concurrent/Flow.Subscription.html "interface in java.util.concurrent"). If [onSubscribe](../../../java/util/concurrent/Flow.Subscriber.html#onSubscribe-java.util.concurrent.Flow.Subscription-) throws an exception, the subscription is cancelled. Otherwise, if this SubmissionPublisher was closed exceptionally, then the subscriber's [onError](../../../java/util/concurrent/Flow.Subscriber.html#onError-java.lang.Throwable-) method is invoked with the corresponding exception, or if closed without exception, the subscriber's [onComplete](../../../java/util/concurrent/Flow.Subscriber.html#onComplete--) method is invoked. Subscribers may enable receiving items by invoking the [request](../../../java/util/concurrent/Flow.Subscription.html#request-long-) method of the new Subscription, and may unsubscribe by invoking its [cancel](../../../java/util/concurrent/Flow.Subscription.html#cancel--) method. Specified by: `[subscribe](../../../java/util/concurrent/Flow.Publisher.html#subscribe-java.util.concurrent.Flow.Subscriber-)` in interface `[Flow.Publisher](../../../java/util/concurrent/Flow.Publisher.html "interface in java.util.concurrent")<[T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")>` Parameters: `subscriber` \- the subscriber Throws: `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if subscriber is null * #### submit public int submit([T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher") item) Publishes the given item to each current subscriber by asynchronously invoking its [onNext](../../../java/util/concurrent/Flow.Subscriber.html#onNext-T-) method, blocking uninterruptibly while resources for any subscriber are unavailable. This method returns an estimate of the maximum lag (number of items submitted but not yet consumed) among all current subscribers. This value is at least one (accounting for this submitted item) if there are any subscribers, else zero. If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, then this exception is rethrown, in which case not all subscribers will have been issued this item. Parameters: `item` \- the (non-null) item to publish Returns: the estimated maximum lag among subscribers Throws: `[IllegalStateException](../../../java/lang/IllegalStateException.html "class in java.lang")` \- if closed `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if item is null `[RejectedExecutionException](../../../java/util/concurrent/RejectedExecutionException.html "class in java.util.concurrent")` \- if thrown by Executor * #### offer public int offer([T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher") item, [BiPredicate](../../../java/util/function/BiPredicate.html "interface in java.util.function")<[Flow.Subscriber](../../../java/util/concurrent/Flow.Subscriber.html "interface in java.util.concurrent")<? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")>,? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")> onDrop) Publishes the given item, if possible, to each current subscriber by asynchronously invoking its [onNext](../../../java/util/concurrent/Flow.Subscriber.html#onNext-T-) method. The item may be dropped by one or more subscribers if resource limits are exceeded, in which case the given handler (if non-null) is invoked, and if it returns true, retried once. Other calls to methods in this class by other threads are blocked while the handler is invoked. Unless recovery is assured, options are usually limited to logging the error and/or issuing an [onError](../../../java/util/concurrent/Flow.Subscriber.html#onError-java.lang.Throwable-) signal to the subscriber. This method returns a status indicator: If negative, it represents the (negative) number of drops (failed attempts to issue the item to a subscriber). Otherwise it is an estimate of the maximum lag (number of items submitted but not yet consumed) among all current subscribers. This value is at least one (accounting for this submitted item) if there are any subscribers, else zero. If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, or the drop handler throws an exception when processing a dropped item, then this exception is rethrown. Parameters: `item` \- the (non-null) item to publish `onDrop` \- if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once) Returns: if negative, the (negative) number of drops; otherwise an estimate of maximum lag Throws: `[IllegalStateException](../../../java/lang/IllegalStateException.html "class in java.lang")` \- if closed `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if item is null `[RejectedExecutionException](../../../java/util/concurrent/RejectedExecutionException.html "class in java.util.concurrent")` \- if thrown by Executor * #### offer public int offer([T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher") item, long timeout, [TimeUnit](../../../java/util/concurrent/TimeUnit.html "enum in java.util.concurrent") unit, [BiPredicate](../../../java/util/function/BiPredicate.html "interface in java.util.function")<[Flow.Subscriber](../../../java/util/concurrent/Flow.Subscriber.html "interface in java.util.concurrent")<? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")>,? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")> onDrop) Publishes the given item, if possible, to each current subscriber by asynchronously invoking its [onNext](../../../java/util/concurrent/Flow.Subscriber.html#onNext-T-) method, blocking while resources for any subscription are unavailable, up to the specified timeout or until the caller thread is interrupted, at which point the given handler (if non-null) is invoked, and if it returns true, retried once. (The drop handler may distinguish timeouts from interrupts by checking whether the current thread is interrupted.) Other calls to methods in this class by other threads are blocked while the handler is invoked. Unless recovery is assured, options are usually limited to logging the error and/or issuing an [onError](../../../java/util/concurrent/Flow.Subscriber.html#onError-java.lang.Throwable-) signal to the subscriber. This method returns a status indicator: If negative, it represents the (negative) number of drops (failed attempts to issue the item to a subscriber). Otherwise it is an estimate of the maximum lag (number of items submitted but not yet consumed) among all current subscribers. This value is at least one (accounting for this submitted item) if there are any subscribers, else zero. If the Executor for this publisher throws a RejectedExecutionException (or any other RuntimeException or Error) when attempting to asynchronously notify subscribers, or the drop handler throws an exception when processing a dropped item, then this exception is rethrown. Parameters: `item` \- the (non-null) item to publish `timeout` \- how long to wait for resources for any subscriber before giving up, in units of `unit` `unit` \- a `TimeUnit` determining how to interpret the`timeout` parameter `onDrop` \- if non-null, the handler invoked upon a drop to a subscriber, with arguments of the subscriber and item; if it returns true, an offer is re-attempted (once) Returns: if negative, the (negative) number of drops; otherwise an estimate of maximum lag Throws: `[IllegalStateException](../../../java/lang/IllegalStateException.html "class in java.lang")` \- if closed `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if item is null `[RejectedExecutionException](../../../java/util/concurrent/RejectedExecutionException.html "class in java.util.concurrent")` \- if thrown by Executor * #### close public void close() Unless already closed, issues [onComplete](../../../java/util/concurrent/Flow.Subscriber.html#onComplete--) signals to current subscribers, and disallows subsequent attempts to publish. Upon return, this method does _NOT_ guarantee that all subscribers have yet completed. Specified by: `[close](../../../java/lang/AutoCloseable.html#close--)` in interface `[AutoCloseable](../../../java/lang/AutoCloseable.html "interface in java.lang")` * #### closeExceptionally public void closeExceptionally([Throwable](../../../java/lang/Throwable.html "class in java.lang") error) Unless already closed, issues [onError](../../../java/util/concurrent/Flow.Subscriber.html#onError-java.lang.Throwable-) signals to current subscribers with the given error, and disallows subsequent attempts to publish. Future subscribers also receive the given error. Upon return, this method does _NOT_ guarantee that all subscribers have yet completed. Parameters: `error` \- the `onError` argument sent to subscribers Throws: `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if error is null * #### isClosed public boolean isClosed() Returns true if this publisher is not accepting submissions. Returns: true if closed * #### getClosedException public [Throwable](../../../java/lang/Throwable.html "class in java.lang") getClosedException() Returns the exception associated with [closeExceptionally](../../../java/util/concurrent/SubmissionPublisher.html#closeExceptionally-java.lang.Throwable-), or null if not closed or if closed normally. Returns: the exception, or null if none * #### hasSubscribers public boolean hasSubscribers() Returns true if this publisher has any subscribers. Returns: true if this publisher has any subscribers * #### getNumberOfSubscribers public int getNumberOfSubscribers() Returns the number of current subscribers. Returns: the number of current subscribers * #### getExecutor public [Executor](../../../java/util/concurrent/Executor.html "interface in java.util.concurrent") getExecutor() Returns the Executor used for asynchronous delivery. Returns: the Executor used for asynchronous delivery * #### getMaxBufferCapacity public int getMaxBufferCapacity() Returns the maximum per-subscriber buffer capacity. Returns: the maximum per-subscriber buffer capacity * #### getSubscribers public [List](../../../java/util/List.html "interface in java.util")<[Flow.Subscriber](../../../java/util/concurrent/Flow.Subscriber.html "interface in java.util.concurrent")<? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")>> getSubscribers() Returns a list of current subscribers for monitoring and tracking purposes, not for invoking [Flow.Subscriber](../../../java/util/concurrent/Flow.Subscriber.html "interface in java.util.concurrent") methods on the subscribers. Returns: list of current subscribers * #### isSubscribed public boolean isSubscribed([Flow.Subscriber](../../../java/util/concurrent/Flow.Subscriber.html "interface in java.util.concurrent")<? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")> subscriber) Returns true if the given Subscriber is currently subscribed. Parameters: `subscriber` \- the subscriber Returns: true if currently subscribed Throws: `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if subscriber is null * #### estimateMinimumDemand public long estimateMinimumDemand() Returns an estimate of the minimum number of items requested (via [request](../../../java/util/concurrent/Flow.Subscription.html#request-long-)) but not yet produced, among all current subscribers. Returns: the estimate, or zero if no subscribers * #### estimateMaximumLag public int estimateMaximumLag() Returns an estimate of the maximum number of items produced but not yet consumed among all current subscribers. Returns: the estimate * #### consume public [CompletableFuture](../../../java/util/concurrent/CompletableFuture.html "class in java.util.concurrent")<[Void](../../../java/lang/Void.html "class in java.lang")> consume([Consumer](../../../java/util/function/Consumer.html "interface in java.util.function")<? super [T](../../../java/util/concurrent/SubmissionPublisher.html "type parameter in SubmissionPublisher")> consumer) Processes all published items using the given Consumer function. Returns a CompletableFuture that is completed normally when this publisher signals [onComplete](../../../java/util/concurrent/Flow.Subscriber.html#onComplete--), or completed exceptionally upon any error, or an exception is thrown by the Consumer, or the returned CompletableFuture is cancelled, in which case no further items are processed. Parameters: `consumer` \- the function applied to each onNext item Returns: a CompletableFuture that is completed normally when the publisher signals onComplete, and exceptionally upon any error or cancellation Throws: `[NullPointerException](../../../java/lang/NullPointerException.html "class in java.lang")` \- if consumer is null