Tabulators, reducers, etc (original) (raw)

Brian Goetz brian.goetz at oracle.com
Thu Dec 27 16:47:36 PST 2012


I really like this suggested API. I think it would be easier to digest with concrete examples that show that these choices are orthogonal and necessary .

Sam On Thu, Dec 27, 2012 at 10:31 AM, Brian Goetz <brian.goetz at oracle.com_ _<mailto:brian.goetz at oracle.com>> wrote: One option might be: use "reduce" for the purely functional forms, _use accumulate/accumulateConcurrent for the others: T reduce(T zero, BinaryOperator reducer);

"Compute the sum of the squares of the first 100 integers."

int sumOfSquares = integers().map(x -> x*x) .limit(100) .reduce(0, Integer::sum);

where integers() generates an infinite IntStream (or maybe one that stops at MAX_VALUE.)

Optional reduce(BinaryOperator reducer);

"How tall is the tallest person?"

Optional tallest = people.map(Person::getHeight) .reduce(greaterOf(naturalOrder()))

where

Comparators.naturalOrder -> Comparator Comparators.greaterOf(Comparator) -> BinaryOperator

"Who is the tallest person"

Optional tallest = people.reduce(greaterOf(comparing(Person::getHeight)));

U reduce(U zero, BiFunction<U, T, U> accumulator, BinaryOperator reducer);

"How many pages are there in this stack of documents"

int pageCount = documents.reduce(0, (c, d) -> c + d.pages(), Integer::sum);

While this can be represented as a map+reduce, sometimes the three-arg form provides more efficiency or flexibility. For example, Joe came up with this one today -- perform String.compare on two strings of equal length, in parallel.

You could do this:

intRange(0, s1.length()) .parallel() .map(i -> cmp(i)) .reduce(0, (l, r) -> (l != 0) ? l : r);

where

cmp(i) = Character.compare(s1.charAt(i), s2.charAt(i));

But, using the three-arg form, we can optimize away the irrelevant comparisons:

intRange(0, s1.length()) .parallel() .reduce(0, (l, i) -> (l != 0) ? l : cmp(i), (l, r) -> (l != 0) ? l : r));

R accumulate(Supplier seedFactory, BiBlock<R, T> accumulator, BiBlock<R, R> reducer);

This is the mutable version of the previous form, where instead of a seed value, there is a factory to create mutable containers, and instead of functions to compute a new aggregation result, we fold new values into the container.

Examples:

ArrayList asList = strings.parallel() .accumulate(ArrayList::new, ArrayList::add, // add(t) ArrayList::addAll) // addAll(Collection)

String concatted = strings.parallel() .accumulate(StringBuilder::new, StringBuilder::add, // add(s) StringBuilder::add) // add(StringBuilder) .toString();

BitSet bs = numbers.parallel() .aggregate(BitSet::new, BitSet::set, BitSet::or);

R accumulate(Accumulator<T, R> reducer);

This one is a convenient form of the previous one, where instead of specifying three lambdas, we tie them together so they can be reused and/or composed.

Accumulator.OfInt TO_BIT_SET = Accumulators.make(BitSet::new, BitSet::set, BitSet::or);

BitSet bs = numbers.accumulate(TO_BIT_SET);

The reuse part is nice, but the composition part is even more important. With an abstraction for Accumulator, all of our aggregations like groupBy, reduceBy, partition, mapTo, etc, are just accumulations, and its trivial to cascade them. For example:

"Transactions by (buyer, seller)"

Map<Buyer, Map<Seller, Collection> map = txns.accumulate(groupBy(Txn::buyer, groupBy(Txn::seller));

The inner groupBy returns an Accumulator<Transaction, Map<Seller, Collection>; the outer groupBy treats this simply as a downstream reduction, and produces a new Accumulator.

"Largest transaction by (buyer, seller)"

Map<Buyer, Map<Seller, Transaction>> m = txns.accumulate(groupBy(Txn::buyer, groupBy(Txn::seller, greaterOf(comparing(Txn::amount)))

"Profitable and unprofitable transactions by salesman"

Map<Seller, Collection[]> map = txns.groupBy(Txn::seller, partition(t -> t.margin() > X)));

Here, partition() returns an Accumulator<Transaction, Collection[]>.

_ R accumulateConcurrent(ConcurrentAccumulator<T, R> tabulator);

All the above accumulations were order-preserving, and some used mutable but not shared containers. This means that containers have to be merged, which often involves nontrivial copying cost.

If you have a concurrent container, AND you don't care about encounter order, AND your reduction functions are commutative (not just associative), you have another choice: shovel things into a concurrent data structure, and hope its contention management is less expensive than merging. Note that this shoveling is really just forEach, not any form of reduce.

The accumulateConcurrent (open to alternate names) makes it clear that you are choosing this mode that has different semantics. However, given a suitable container, all the same aggregations can be done as concurrent/mutative/order-ignoring/commutative accumulations. So:

"Transactions by (buyer, seller)"

ConcurrentMap<Buyer, Map<Seller, Collection> map = txns.accumulateConcurrent(groupBy(ConcurrentHashMap::new, Txn::buyer, groupBy(Txn::seller));

The preference for concurrent in the method name is that without it, it wouldn't be possible to tell whether a given accumulation is concurrent or not. Because the semantics are so different, I think this is a choice we shouldn't encourage brushing under the rug.

Finding another name for "accumulateConcurrent" would also be OK, maybe one that has "forEach" in it, like:

map = txns.forEachInto(ConcurrentHashMap::new, groupBy(Txn::buyer))



More information about the lambda-libs-spec-experts mailing list