Infinite parallel stream (original) (raw)
Paul Sandoz paul.sandoz at oracle.com
Tue Apr 30 05:50:04 PDT 2013
- Previous message: Infinite parallel stream
- Next message: Infinite parallel stream
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
On Apr 30, 2013, at 1:33 PM, Paul Sandoz <Paul.Sandoz at oracle.com> wrote:
Hi Boaz,
You have hit a known issue with the current implementation of limit. The F/J leaf tasks of the limit implementation do not keep track their traversal count and report that to other tasks so that tasks know when the global limit has been reached.
There are various optimizations we can implement depending on the characteristics: 1) SIZED & SUBSIZED & ORDERED A SliceSpliterator can wrap the underlying spliterator, no buffering required, throws away splits that do not intersect the slice range. I am working on this right now. 2) !ORDERED A global count can be maintained (like discussed above) so leaf F/J tasks know when to stop traversing and buffering. Using an AtomicLong to hold a global limit count may result in too much contention
(since there is no order it does not matter how many elements are skipped),
That is not quite true, it does not matter if number of elements to skip is less than the known total size, which alas we might not know.
Paul.
perhaps there are some other tricks like using LongAdder.
I did not realize before but this case combined with a spliterator like you have written could work for an infinite Stream.generate implementation. Currently we use an iterator, and i am currently working on changing that to be based on LongStream.range(0, Long.MAXVALUE).map(e -> s.get()). Hmm... perhaps this needs to be rethought...
3) ORDERED This is the worst possible case, and it is what is currently implemented and used for the first two cases as well. For this we may need to use the same spliterator from iterator tricks i.e. copy a prefix of elements into an array, with the assumption that limits are not unduly large. Paul. On Apr 30, 2013, at 10:50 AM, Boaz Nahum <boaznahum at gmail.com> wrote: I'm trying to generate parallel stream from an infinite iterator:
Stream s = generateParallel(supplier). parallel(). limit(1000); s.forEach(System.out::println); My Spliterator: private static class ParallelGenerateSpliterator implements Spliterator { private final Supplier<? extends T> supplier; private long estimatedSize; public ParallelGenerateSpliterator(Supplier<? extends T> supplier, long estimatedSize) { this.supplier = supplier; this.estimatedSize = estimatedSize; } public ParallelGenerateSpliterator(Supplier<? extends T> supplier) { this(supplier, 1 << Runtime.getRuntime().availableProcessors());_ _}_ _@Override_ _public boolean tryAdvance(Consumer<? super T> action) { action.accept(supplier.get()); return true; } @Override public Spliterator trySplit() { long spliSize = estimatedSize >> 1; if (spliSize == 0) { return null; } estimatedSize -= spliSize; return new ParallelGenerateSpliterator(supplier, spliSize); } @Override public long estimateSize() { return estimatedSize; } @Override public int characteristics() { return IMMUTABLE; } } But the examples above run for ever, loop endlessly in AbstractPipeline: spliterator.forEachRemaining(wrappedSink);
Can I solve it without making spliterator non infinite ? Thank Boaz
- Previous message: Infinite parallel stream
- Next message: Infinite parallel stream
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]