Cancelable streams (original) (raw)

Sam Pullara sam at sampullara.com
Fri Nov 2 13:30:38 PDT 2012


Won't you have to invert this and name it 'until' since while isn't available as a method name?

Generally I like the idea of being able to short circuit things. Perhaps overload a limit() operation so that it can either take a long or a predicate?

You could also make this a fluent API where you can add a callback when cancelled:

stream.filter(...).map(...).limit(i -> i > 10).**forEach(...).onCancel(i -> ...i items processed...);

Sam

On Fri, Nov 2, 2012 at 1:01 PM, Brian Goetz <brian.goetz at oracle.com> wrote:

We've got a reasonable start at support in the library for infinite streams. Of course, there is the risk that you'll do something that never terminates, like:

infiniteStream.forEach(...) If you have a short-circuit operation (either an intermediate operation like limit(n), or a terminal operation like findFirst), everything works fine -- and should work fine even in parallel (once we have parallel implementations for limit/findFirst and do a little more implementation work on building Spliterators on infinite iterators.) The other use case that we want to support is where you have an infinite data feed and you want to do parallel operations on it "until you're tired." This could mean "chew on it for five minutes and give me what you've got", or "chew until a shutdown is requested", etc. This is something that has been explicitly requested by several folks (Sam?) to do parallel processing on infinite event streams. So, something like: infiniteIOStream.while(e -> !cancelationRequested) .filter(...) .map(....) .forEach(...)

The obvious idea is to have a "while" op. And this works great for the serial case, but I'm not sure it works all that well in the parallel case. The other alternative is to have an abstraction for "cancelable stream": CancelableStream stream = Streams.cancelable(generator, () -> !cancelationRequested); A few questions: - How important is it to be able to find out why the stream processing terminated -- because it ran out of data, or because it was canceled? Then we could do: CancelableStream stream = Streams.cancelable(ioStream, e -> !cancelationRequested); stream.filter(...).map(...).**forEach(...) if (stream.wasCanceled()) { .... } Seems to me that this is something that is more a property of the source than of the pipeline. Otherwise, for operations that consume all input (like sorted), there is a real possibility for misuse: is.filter(...).sorted(..).**while(...) // definitely bad is.filter(...).while(...).**sorted(..) // possible ok is.while(...).filter(...).**sorted(..) // definitely ok - Do we want to support cancelability by content (i.e., stop when you see an element that matches this predicate), or is that excessively conflating one functionality with another (my thought: these are two different functionalities, keep them separate) - How does cancelation interact with encounter ordering? This is largely about whether its OK to process elements that follow the one that trigger the cancelation, or whether we have to stop immediately on finding the cancelation criteria hold true.



More information about the lambda-libs-spec-observers mailing list