Background: pipeline architecture (original) (raw)

Brian Goetz brian.goetz at oracle.com
Mon Dec 31 13:11:36 PST 2012


Here's an attempt at putting most of the background on the architecture of pipelines in one place.

A complete stream pipeline has several components:

The source is a Supplier. The reason for the indirection is so we can narrow the window where we require non-interference from (stream creation, end of terminal operation) down to (start of terminal operation, end of terminal operation.)

So, for example, this case:

list = new ... Stream s = list.stream(); // mutate list s.forEach(...)

will see the list state as it was before the forEach, not at the list.stream() capture. This is easy to implement; the stream() method in ArrayList is:

     return Streams.stream(
       () -> Arrays.spliterator((E[]) elementData, 0, size),
       flags);

By deferring evaluation of elementData and size, we can late-bind to the data.

The source flags are a set of dynamic properties of the source. Defined flags include:

We also encode serial-vs-parallel as a source flag, though this is mostly an implementation expediency.

Intermediate operations produce a new stream, lazily; invoking an intermediate operation on a stream just sets up the new stream, but does not cause any computation nor does it cause the source to be consumed. The fundamental operation implemented by an intermediate operation is to wrap a "Sink" with a new Sink. So in a pipeline like

list.filter(...).map(...).reduce(...)

the terminal reduction operation will create a sink which reduces the values fed to it, and the map operation will wrap that with a mapping sink that transforms values as they pass through, and and the filter operation will wrap it with a filtering sink that only passes some elements through.

Intermediate operations are divided into two kinds, stateful and stateless. The stateless operations are the well-behaved ones, which depend only on their inputs -- filter, map, mapMulti. The stateful ones include sorted, removeDuplicates, and limit.

In a sequential pipeline, all intermediate ops can be jammed together for a single pass on the data. In a parallel pipeline, the same can be done only if all intermediate ops are stateless. Otherwise, we slice up the pipeline into segments ending in stateful ops, and execute them in segments:

list.parallel().filter(...).sorted().map(...).reduce(...) ^-------------------^ ^------------------^ segment 1 segment 2

where the output of segment 1 is gathered into a conc-tree and then used as the source for segment 2. This segmentation is why Doug hates these operations; it complicates the parallel execution, obfuscates the cost model, and maps much more poorly to targets like GPUs.

Each intermediate op also has a mask describing its flags. For each of the flags X described above, two bits are used to represent "injects X", "preserves X", or "clears X". For example, sorted() preserves size and injects sortedness and ordering. Filtering preserves ordering and distinctness but clears size. Mapping preserves size and but clears sortedness and distinctness. The flags for a pipeline are computed with boolean fu to take the source flags and fold in the effect of each op as the pipeline is built. There is also a SHORT_CIRCUIT flag which is only valid on ops (not source), and forces pull rather than push evaluation. Examples of short-circuit operations include limit().

Terminal operations cause evaluation of the pipeline; at the time a terminal operation is executed, the source is consumed (calling get() on the Supplier), a chain of sinks is created, parallel decomposition using spliterators is done, etc.

Flags are used to optimize both sink chain construction and terminal execution. For example, if the upstream flags indicate sortedness, a sorted() operation is a no-op, reflected by the implementation of wrapSink(flags, sink) just returning the sink it was passed. Similarly, for terminal ops, orderedness can be used to relax constraints on the output, enabling more efficient computation if you know that the result need not respect encounter order. If the source is known to be sized, and all the ops are size-preserving, operations like toArray() can exploit size information to minimize allocation and copying.

The set of operations are defined in Stream for reference streams, and IntStream for int streams; each of these has a (private) implementation class {Reference,Int}Pipeline who share a (private) base class AbstractPipeline. We represent a stream pipeline as a linked list of XxxPipeline objects, where each holds an op and links to its parent. Because of the shared base class, pipelines can cross shapes and still operations can be jammed together into a single pass, such as in:

people.stream().filter(..).map(Person::getHeight).max(); ^Stream ^Stream ^IntStream

and even though the "shape" of the data changes from reference to int we can create a single sink chain where we push Person objects in and (unboxed) ints come out.



More information about the lambda-libs-spec-experts mailing list