total then ``combine`` is of type (total, total) -> total Defaults to ``binop`` for common case of operators like add Fold chunks up the collection into blocks of size ``chunksize`` and then feeds each of these to calls to ``reduce``. This work is distributed with a call to ``map``, gathered back and then refolded to finish the computation. In this way ``fold`` specifies only how to chunk up data but leaves the distribution of this work to an externally provided ``map`` function. This function can be sequential or rely on multithreading, multiprocessing, or even distributed solutions. If ``map`` intends to serialize functions it should be prepared to accept and serialize lambdas. Note that the standard ``pickle`` module fails here. Example ------- >>> # Provide a parallel map to accomplish a parallel sum >>> from operator import add >>> fold(add, [1, 2, 3, 4], chunksize=2, map=map) 10 """ assert chunksize > 1 if combine is None: combine = binop chunks = partition_all(chunksize, seq) # Evaluate sequence in chunks via map if default == no_default: results = map( functools.partial(_reduce, binop), chunks) else: results = map( functools.partial(_reduce, binop, initial=default), chunks) results = list(results) # TODO: Support complete laziness if len(results) == 1: # Return completed result return results[0] else: # Recurse to reaggregate intermediate results return fold(combine, results, map=map, chunksize=chunksize)">

toolz.sandbox.parallel - Toolz (original) (raw)

import functools from toolz.itertoolz import partition_all from toolz.utils import no_default

def _reduce(func, seq, initial=None): if initial is None: return functools.reduce(func, seq) else: return functools.reduce(func, seq, initial)

[docs] def fold(binop, seq, default=no_default, map=map, chunksize=128, combine=None): """ Reduce without guarantee of ordered reduction.

Parameters
----------
binops
    Associative operator. The associative property allows us to
    leverage a parallel map to perform reductions in parallel.


inputs:

``binop``     - associative operator. The associative property allows us to
                leverage a parallel map to perform reductions in parallel.

``seq``       - a sequence to be aggregated
``default``   - an identity element like 0 for ``add`` or 1 for mul

``map``       - an implementation of ``map``. This may be parallel and
                determines how work is distributed.
``chunksize`` - Number of elements of ``seq`` that should be handled
                within a single function call
``combine``   - Binary operator to combine two intermediate results.
                If ``binop`` is of type (total, item) -> total
                then ``combine`` is of type (total, total) -> total
                Defaults to ``binop`` for common case of operators like add

Fold chunks up the collection into blocks of size ``chunksize`` and then
feeds each of these to calls to ``reduce``. This work is distributed
with a call to ``map``, gathered back and then refolded to finish the
computation. In this way ``fold`` specifies only how to chunk up data but
leaves the distribution of this work to an externally provided ``map``
function. This function can be sequential or rely on multithreading,
multiprocessing, or even distributed solutions.

If ``map`` intends to serialize functions it should be prepared to accept
and serialize lambdas. Note that the standard ``pickle`` module fails
here.

Example
-------

>>> # Provide a parallel map to accomplish a parallel sum
>>> from operator import add
>>> fold(add, [1, 2, 3, 4], chunksize=2, map=map)
10
"""
assert chunksize > 1

if combine is None:
    combine = binop

chunks = partition_all(chunksize, seq)

# Evaluate sequence in chunks via map
if default == no_default:
    results = map(
        functools.partial(_reduce, binop),
        chunks)
else:
    results = map(
        functools.partial(_reduce, binop, initial=default),
        chunks)

results = list(results)  # TODO: Support complete laziness

if len(results) == 1:    # Return completed result
    return results[0]
else:                    # Recurse to reaggregate intermediate results
    return fold(combine, results, map=map, chunksize=chunksize)