Stream parallel() / sequential() question. (original) (raw)

Brian Goetz brian.goetz at oracle.com
Thu Apr 4 08:43:14 PDT 2013


sequential() is definitely less important now that we have forEachOrdered, though there is still a semantic difference -- sequential makes the whole pipeline sequential, whereas forEachOrdered allows parallel upstream computation while preserving encounter order at the terminal stage. So if you have something like:

sortedList.map(expensiveFunction).forEachOrdered(System.out::println)

this will enable you to apply the expensive functino in parallel but still print f(e) in the order imposed by the sort.

Having ordered/unordered variants of all ops exposes the complexity of reasoning about order to every user in every case -- which is undesirable. Reasoning about encounter order should be a corner case, and out of the user's mind most of the time.

On 4/4/2013 11:35 AM, Boaz Nahum wrote:

Thanks.

So why someone need to use sequential() ? Just mark all terminal operations ordered/unordered: forEachOrdered forEachUnordered Unfortunately I already sent some bombs :( .... Thanks. Boaz

On Thu, Apr 4, 2013 at 6:12 PM, Brian Goetz <brian.goetz at oracle.com> wrote: Yes, that's right. The moral of the story is: don't pass stateful lambdas to stream methods. The Stream API is designed so that streams can be safely executed in serial or parallel. Your responsibility as a responsible stream user is not to put stateful lambdas into streams. Think of a stream as a package, and a stateful lambda as a bomb. Its not nice to put bombs in packages and mail them. On 4/4/2013 10:49 AM, Boaz Nahum wrote: Suppose I wrote a method: Stream foo(Stream source ) { return s.sequential().map(**nonThreadSafeMapper); } I which I used a non-thread-safe-mapper. To protect my self I 'sequential()' it. Now, somewhere else somebody: foo().parallel() ....

So, My non-thread-safe-mapper is running concurrently ? Thanks Boaz

On Thu, Apr 4, 2013 at 5:39 PM, Brian Goetz <brian.goetz at oracle.com> wrote: Let me make a real world analogy. Replace "stream" with "package". Replace "non thread safe mapper" with "bomb". Boaz concludes: "Don't send packages in the mail, someone could get hurt because the package might have a bomb in it." Brian logic: "Don't send packages in the mail that have bombs in them." Seriously, streams with non-thread-safe lambdas are like bombs. Don't make bombs, and no one blows up, no matter how many packages we send each other! Now, let's rewind and bring this conversation back to the list -- can you summarize your question on-list so everyone can see the answer? Knowing in advance that I will poke some fun at you :) On 4/4/2013 10:30 AM, Boaz Nahum wrote: Becuase, if Me: ---- Stream foo() { return s.sequential().map(****nonThreadSafeMapper); } You: ------ foo().parallel(). ..... Then my non thread safe mapper will be run concurrently ! Thanks Boaz

On Thu, Apr 4, 2013 at 5:16 PM, Brian Goetz <brian.goetz at oracle.com_ _<mailto:brian.goetz at oracle.com****>> wrote: Exactly the opposite! All streams should be usable and produce correct results under either sequential or parallel execution. Why would this advice make you think we were telling you not to return streams? On 4/4/2013 10:14 AM, Boaz Nahum wrote: I understand. But still don't know how to solve my problem ? If I understood right, You are telling me don't write libraries that return a Stream ? Because you never know how your stream will be used ? Thank Boaz

On Thu, Apr 4, 2013 at 5:05 PM, Brian Goetz <brian.goetz at oracle.com <mailto:brian.goetz at oracle.com****> <mailto:brian.goetz at oracle.com_ _<mailto:brian.goetz at oracle.com****>>> wrote:_ This was simplified recently. There is one sequential/parallel bit for the whole pipeline. The stream starts off with it set one way or the other. These calls overwrite it. The bit is only acted on when you actually start the computation (invoke the terminal operation.) On 4/4/2013 9:21 AM, Boaz Nahum wrote: When I invoked parallel() or sequential() how backward it goes ? Let me explain, I wrote a simple Consumer that report how many different threads used to run it: source. parallel().peek(new ThreadReporter("Segement 1 parallel")). sequential().peek(new ThreadReporter("Segement 2 sequential")). parallel().peek(new ThreadReporter("Segement 3 parallel")). sequential().peek(new ThreadReporter("Segement 4 sequential")).forEach((t) -> {}); private static class ThreadReporter implements Consumer { @Override public void accept(Integer integer) { threads.put(Thread.__**** currentThread(), true); } public void report() { System.out.println("Name +'" + name + "': " + threads.size() + " Thread(s)"); } }



More information about the lambda-dev mailing list