Yet another run at reduce (original) (raw)
Brian Goetz brian.goetz at oracle.com
Tue Jan 8 07:15:26 PST 2013
- Previous message: Yet another run at reduce
- Next message: Yet another run at reduce
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
Here's a regular reducer that does a two-level groupBy with a downstream reducer:
static <T, K, D, M extends Map<K, D>>
Reducer<T, M>
groupBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Reducer<T, D> downstream) {
return new MergingMapReducer<T, K, D, M>(mapFactory,
downstream::combine) { @Override public void accumulate(M map, T value) { K key = Objects.requireNonNull(classifier.apply(value), "element cannot be mapped to a null key"); downstream.accumulate(map.computeIfAbsent(key, k -> downstream.makeResult()), value); } }; }
where this helper class defines the stuff common to all merging-map reducers:
private static abstract class MergingMapReducer<T, K, V, M extends
Map<K, V>> implements Reducer<T,M> { private final Supplier mapFactory; private final BinaryOperator mergeFunction;
protected MergingMapReducer(Supplier<M> mapFactory,
BinaryOperator<V> mergeFunction) {
this.mapFactory = mapFactory;
this.mergeFunction = mergeFunction;
}
@Override
public M makeResult() {
return mapFactory.get();
}
protected void accumulate(M map, K key, V newValue) {
map.merge(key, newValue, mergeFunction);
}
@Override
public M combine(M map, M other) {
for (Map.Entry<K, V> e : other.entrySet())
map.merge(e.getKey(), e.getValue(), mergeFunction);
return map;
}
}
And here's the concurrent version:
static <T, K, D, M extends ConcurrentMap<K, D>>
Reducer<T, M> groupBy(Function<? super T, ? extends K> classifier,
Supplier<M> mapFactory,
Reducer<T, D> downstream) {
return new ConcurrentMapReducer<T, K, D, M>(mapFactory,
downstream::combine) { @Override public void accumulate(M map, T t) { D container = map.computeIfAbsent(classifier.apply(t), k -> downstream.makeResult()); if (downstream.isConcurrent()) { downstream.accumulate(container, t); } else { synchronized (container) { downstream.accumulate(container, t); } } } }; }
Or by examples, did you mean use examples?
On 1/8/2013 9:41 AM, Tim Peierls wrote:
Sounds great -- I'd be interested in seeing some basic examples of both types of reducer.
--tim On Mon, Jan 7, 2013 at 8:31 PM, Brian Goetz <brian.goetz at oracle.com_ _<mailto:brian.goetz at oracle.com>> wrote: I think Doug and I made some progress on reduce forms today. Recap: there are two ways to do a reduction to a mutable data structure like a Map; a classic fold (where you create lots of little maps, one per leaf of the computation tree, accumulate leaf data into them in a thread-confined manner, then merge their contents up the tree into one big map), and a concurrent bombardment (where you create one big concurrent-safe map, and blast elements in from many threads.) Call these "A" and "B". The requirement for A is that: - your combining functions are associative - you can create multiple containers - you can incorporate a new element into a container - you can merge containers in a way that satisfies the obvious distributive requirement If you meet these requirements, you can do a parallel A reduce that respects encounter order. The requirement for B is that: - your combining functions are associative and commutative - you can incorporate a new element into a container - your container support concurrent incorporation If you meet these requirements, you can do a parallel B reduce that does NOT respect order, but may be faster (or slower, if contention is high enough.) Key observation: A "B" reducer can become an "A" reducer simply by adding the merge ability, which is no harder than for regular A reducers. So rather than have A reducers and B reducers, we can have A reducers and A+B reducers. It's only marginally more work for B reducer writers. So... public interface Reducer<T, R> { boolean isConcurrent(); R makeResult(); void accumulate(R result, T value); R combine(R result, R other); } A reducers return 'false' for isConcurrent; B reducers return 'true'. _What was Concurrent{Reducer,Tabulator,Accumulator} goes away. Now, in addition to the various forms of reduce(), we add overloaded: reduce(Reducer) reduceUnordered(Reducer) You will get a B reduction if (a) you selected reduceUnordered and (b) your reducer is concurrent. Otherwise you will get an A reduction. This is nice because the knowledge of properties of "user doesn't care about order" and "target supports concurrent insertion" naturally live in different places; this separates them properly. The latter is a property of the reducer implementation; the former only the user knows about and therefore should live at the call site. Each contributes their bit and can mostly remain ignorant of the other; the library combines these bits, and if both are present, you get a B reduction. The reduceUnordered() method can be cast as an optimization to reduce(); it is always correct to use reduce(), but may be faster to use reduceUnordered(), as long as you are willing to forgo order and the reducer is coooperative. In neither case (assuming a properly implemented reducer) will you ever get a thread-unsafe result; if you ask for an unordered reduction with a nonconcurrent reducer, you get a safe ordered reduction instead, which might be slower (or faster.) Pros: - ConcurrentReducer goes away - .unordered() goes away - Properly separates order-sensitivity from concurrency Cons: - Two variations on various aspects of reducing are surfaced in the API (unordered and concurrent) that will make users say "huh?" (but if you get it wrong /choose to stay ignorant you still get the right answer.) Implementation coming.
- Previous message: Yet another run at reduce
- Next message: Yet another run at reduce
- Messages sorted by: [ date ] [ thread ] [ subject ] [ author ]
More information about the lambda-libs-spec-observers mailing list