Yet another run at reduce (original) (raw)

Brian Goetz brian.goetz at oracle.com
Tue Jan 8 07:15:26 PST 2013


Here's a regular reducer that does a two-level groupBy with a downstream reducer:

 static <T, K, D, M extends Map<K, D>>
 Reducer<T, M>
 groupBy(Function<? super T, ? extends K> classifier,
         Supplier<M> mapFactory,
         Reducer<T, D> downstream) {
     return new MergingMapReducer<T, K, D, M>(mapFactory, 

downstream::combine) { @Override public void accumulate(M map, T value) { K key = Objects.requireNonNull(classifier.apply(value), "element cannot be mapped to a null key"); downstream.accumulate(map.computeIfAbsent(key, k -> downstream.makeResult()), value); } }; }

where this helper class defines the stuff common to all merging-map reducers:

 private static abstract class MergingMapReducer<T, K, V, M extends 

Map<K, V>> implements Reducer<T,M> { private final Supplier mapFactory; private final BinaryOperator mergeFunction;

     protected MergingMapReducer(Supplier<M> mapFactory,
                                 BinaryOperator<V> mergeFunction) {
         this.mapFactory = mapFactory;
         this.mergeFunction = mergeFunction;
     }

     @Override
     public M makeResult() {
         return mapFactory.get();
     }

     protected void accumulate(M map, K key, V newValue) {
         map.merge(key, newValue, mergeFunction);
     }

     @Override
     public M combine(M map, M other) {
         for (Map.Entry<K, V> e : other.entrySet())
             map.merge(e.getKey(), e.getValue(), mergeFunction);
         return map;
     }
 }

And here's the concurrent version:

 static <T, K, D, M extends ConcurrentMap<K, D>>
 Reducer<T, M> groupBy(Function<? super T, ? extends K> classifier,
                       Supplier<M> mapFactory,
                       Reducer<T, D> downstream) {
     return new ConcurrentMapReducer<T, K, D, M>(mapFactory, 

downstream::combine) { @Override public void accumulate(M map, T t) { D container = map.computeIfAbsent(classifier.apply(t), k -> downstream.makeResult()); if (downstream.isConcurrent()) { downstream.accumulate(container, t); } else { synchronized (container) { downstream.accumulate(container, t); } } } }; }

Or by examples, did you mean use examples?

On 1/8/2013 9:41 AM, Tim Peierls wrote:

Sounds great -- I'd be interested in seeing some basic examples of both types of reducer.

--tim On Mon, Jan 7, 2013 at 8:31 PM, Brian Goetz <brian.goetz at oracle.com_ _<mailto:brian.goetz at oracle.com>> wrote: I think Doug and I made some progress on reduce forms today. Recap: there are two ways to do a reduction to a mutable data structure like a Map; a classic fold (where you create lots of little maps, one per leaf of the computation tree, accumulate leaf data into them in a thread-confined manner, then merge their contents up the tree into one big map), and a concurrent bombardment (where you create one big concurrent-safe map, and blast elements in from many threads.) Call these "A" and "B". The requirement for A is that: - your combining functions are associative - you can create multiple containers - you can incorporate a new element into a container - you can merge containers in a way that satisfies the obvious distributive requirement If you meet these requirements, you can do a parallel A reduce that respects encounter order. The requirement for B is that: - your combining functions are associative and commutative - you can incorporate a new element into a container - your container support concurrent incorporation If you meet these requirements, you can do a parallel B reduce that does NOT respect order, but may be faster (or slower, if contention is high enough.) Key observation: A "B" reducer can become an "A" reducer simply by adding the merge ability, which is no harder than for regular A reducers. So rather than have A reducers and B reducers, we can have A reducers and A+B reducers. It's only marginally more work for B reducer writers. So... public interface Reducer<T, R> { boolean isConcurrent(); R makeResult(); void accumulate(R result, T value); R combine(R result, R other); } A reducers return 'false' for isConcurrent; B reducers return 'true'. _What was Concurrent{Reducer,Tabulator,Accumulator} goes away. Now, in addition to the various forms of reduce(), we add overloaded: reduce(Reducer) reduceUnordered(Reducer) You will get a B reduction if (a) you selected reduceUnordered and (b) your reducer is concurrent. Otherwise you will get an A reduction. This is nice because the knowledge of properties of "user doesn't care about order" and "target supports concurrent insertion" naturally live in different places; this separates them properly. The latter is a property of the reducer implementation; the former only the user knows about and therefore should live at the call site. Each contributes their bit and can mostly remain ignorant of the other; the library combines these bits, and if both are present, you get a B reduction. The reduceUnordered() method can be cast as an optimization to reduce(); it is always correct to use reduce(), but may be faster to use reduceUnordered(), as long as you are willing to forgo order and the reducer is coooperative. In neither case (assuming a properly implemented reducer) will you ever get a thread-unsafe result; if you ask for an unordered reduction with a nonconcurrent reducer, you get a safe ordered reduction instead, which might be slower (or faster.) Pros: - ConcurrentReducer goes away - .unordered() goes away - Properly separates order-sensitivity from concurrency Cons: - Two variations on various aspects of reducing are surfaced in the API (unordered and concurrent) that will make users say "huh?" (but if you get it wrong /choose to stay ignorant you still get the right answer.) Implementation coming.



More information about the lambda-libs-spec-observers mailing list