API Reference — Dask documentation (original) (raw)
API Reference¶
Dask APIs generally follow from upstream APIs:
- Arrays follows NumPy
- DataFrames follows Pandas
- Bag follows map/filter/groupby/reduce common in Spark and Python iterators
- Delayed wraps general Python code
- Futures follows concurrent.futures from the standard library for real-time computation.
Additionally, Dask has its own functions to start computations, persist data in memory, check progress, and so forth that complement the APIs above. These more general Dask functions are described below:
compute(*args[, traverse, optimize_graph, ...]) | Compute several dask collections at once. |
---|---|
is_dask_collection(x) | Returns True if x is a dask collection. |
optimize(*args[, traverse]) | Optimize several dask collections at once. |
persist(*args[, traverse, optimize_graph, ...]) | Persist multiple Dask collections into memory |
visualize(*args[, filename, traverse, ...]) | Visualize several dask graphs simultaneously. |
These functions work with any scheduler. More advanced operations are available when using the newer scheduler and starting adask.distributed.Client
(which, despite its name, runs nicely on a single machine). This API provides the ability to submit, cancel, and track work asynchronously, and includes many functions for complex inter-task workflows. These are not necessary for normal operation, but can be useful for real-time or advanced operation.
This more advanced API is available in the Dask distributed documentation
dask.annotate(**annotations: Any) → collections.abc.Iterator[None][source]¶
Context Manager for setting HighLevelGraph Layer annotations.
Annotations are metadata or soft constraints associated with tasks that dask schedulers may choose to respect: They signal intent without enforcing hard constraints. As such, they are primarily designed for use with the distributed scheduler.
Almost any object can serve as an annotation, but small Python objects are preferred, while large objects such as NumPy arrays are discouraged.
Callables supplied as an annotation should take a single key argument and produce the appropriate annotation. Individual task keys in the annotated collection are supplied to the callable.
Parameters
**annotationskey-value pairs
Examples
All tasks within array A should have priority 100 and be retried 3 times on failure.
import dask import dask.array as da with dask.annotate(priority=100, retries=3): ... A = da.ones((10000, 10000))
Prioritise tasks within Array A on flattened block ID.
nblocks = (10, 10) with dask.annotate(priority=lambda k: k[1]*nblocks[1] + k[2]): ... A = da.ones((1000, 1000), chunks=(100, 100))
Annotations may be nested.
with dask.annotate(priority=1): ... with dask.annotate(retries=3): ... A = da.ones((1000, 1000)) ... B = A + 1
dask.get_annotations() → dict[str, Any][source]¶
Get current annotations.
Returns
Dict of all current annotations
dask.compute(*args, traverse=True, optimize_graph=True, scheduler=None, get=None, **kwargs)[source]¶
Compute several dask collections at once.
Parameters
argsobject
Any number of objects. If it is a dask object, it’s computed and the result is returned. By default, python builtin collections are also traversed to look for dask objects (for more information see thetraverse
keyword). Non-dask arguments are passed through unchanged.
traversebool, optional
By default dask traverses builtin python collections looking for dask objects passed to compute
. For large collections this can be expensive. If none of the arguments contain any dask objects, settraverse=False
to avoid doing this traversal.
schedulerstring, optional
Which scheduler to use like “threads”, “synchronous” or “processes”. If not provided, the default is to check the global settings first, and then fall back to the collection defaults.
optimize_graphbool, optional
If True [default], the optimizations for each collection are applied before computation. Otherwise the graph is run as is. This can be useful for debugging.
getNone
Should be left to None
The get= keyword has been removed.
kwargs
Extra keywords to forward to the scheduler function.
Examples
import dask import dask.array as da a = da.arange(10, chunks=2).sum() b = da.arange(10, chunks=2).mean() dask.compute(a, b) (np.int64(45), np.float64(4.5))
By default, dask objects inside python collections will also be computed:
dask.compute({'a': a, 'b': b, 'c': 1}) ({'a': np.int64(45), 'b': np.float64(4.5), 'c': 1},)
dask.is_dask_collection(x) → bool[source]¶
Returns True
if x
is a dask collection.
Parameters
xAny
Object to test.
Returns
resultbool
True
if x is a Dask collection.
Notes
The DaskCollection typing.Protocol implementation defines a Dask collection as a class that returns a Mapping from the__dask_graph__
method. This helper function existed before the implementation of the protocol.
dask.optimize(*args, traverse=True, **kwargs)[source]¶
Optimize several dask collections at once.
Returns equivalent dask collections that all share the same merged and optimized underlying graph. This can be useful if converting multiple collections to delayed objects, or to manually apply the optimizations at strategic points.
Note that in most cases you shouldn’t need to call this function directly.
Warning:
This function triggers a materialization of the collections and looses any annotations attached to HLG layers.
Parameters
*argsobjects
Any number of objects. If a dask object, its graph is optimized and merged with all those of all other dask objects before returning an equivalent dask collection. Non-dask arguments are passed through unchanged.
traversebool, optional
By default dask traverses builtin python collections looking for dask objects passed to optimize
. For large collections this can be expensive. If none of the arguments contain any dask objects, settraverse=False
to avoid doing this traversal.
optimizationslist of callables, optional
Additional optimization passes to perform.
**kwargs
Extra keyword arguments to forward to the optimization passes.
Examples
import dask import dask.array as da a = da.arange(10, chunks=2).sum() b = da.arange(10, chunks=2).mean() a2, b2 = dask.optimize(a, b)
a2.compute() == a.compute() np.True_ b2.compute() == b.compute() np.True_
dask.persist(*args, traverse=True, optimize_graph=True, scheduler=None, **kwargs)[source]¶
Persist multiple Dask collections into memory
This turns lazy Dask collections into Dask collections with the same metadata, but now with their results fully computed or actively computing in the background.
For example a lazy dask.array built up from many lazy calls will now be a dask.array of the same shape, dtype, chunks, etc., but now with all of those previously lazy tasks either computed in memory as many small numpy.array
(in the single-machine case) or asynchronously running in the background on a cluster (in the distributed case).
This function operates differently if a dask.distributed.Client
exists and is connected to a distributed scheduler. In this case this function will return as soon as the task graph has been submitted to the cluster, but before the computations have completed. Computations will continue asynchronously in the background. When using this function with the single machine scheduler it blocks until the computations have finished.
When using Dask on a single machine you should ensure that the dataset fits entirely within memory.
Parameters
*args: Dask collections
schedulerstring, optional
Which scheduler to use like “threads”, “synchronous” or “processes”. If not provided, the default is to check the global settings first, and then fall back to the collection defaults.
traversebool, optional
By default dask traverses builtin python collections looking for dask objects passed to persist
. For large collections this can be expensive. If none of the arguments contain any dask objects, settraverse=False
to avoid doing this traversal.
optimize_graphbool, optional
If True [default], the graph is optimized before computation. Otherwise the graph is run as is. This can be useful for debugging.
**kwargs
Extra keywords to forward to the scheduler function.
Returns
New dask collections backed by in-memory data
Examples
df = dd.read_csv('/path/to/*.csv')
df = df[df.name == 'Alice']
df['in-debt'] = df.balance < 0
df = df.persist() # triggers computation
df.value().min() # future computations are now fast
-10 df.value().max()
100
from dask import persist # use persist function on multiple collections a, b = persist(a, b)
dask.visualize(*args, filename='mydask', traverse=True, optimize_graph=False, maxval=None, engine: Optional[Literal['cytoscape', 'ipycytoscape', 'graphviz']] = None, **kwargs)[source]¶
Visualize several dask graphs simultaneously.
Requires graphviz
to be installed. All options that are not the dask graph(s) should be passed as keyword arguments.
Parameters
argsobject
Any number of objects. If it is a dask collection (for example, a dask DataFrame, Array, Bag, or Delayed), its associated graph will be included in the output of visualize. By default, python builtin collections are also traversed to look for dask objects (for more information see the traverse
keyword). Arguments lacking an associated graph will be ignored.
filenamestr or None, optional
The name of the file to write to disk. If the provided filenamedoesn’t include an extension, ‘.png’ will be used by default. If filename is None, no file will be written, and we communicate with dot using only pipes.
format{‘png’, ‘pdf’, ‘dot’, ‘svg’, ‘jpeg’, ‘jpg’}, optional
Format in which to write output file. Default is ‘png’.
traversebool, optional
By default, dask traverses builtin python collections looking for dask objects passed to visualize
. For large collections this can be expensive. If none of the arguments contain any dask objects, settraverse=False
to avoid doing this traversal.
optimize_graphbool, optional
If True, the graph is optimized before rendering. Otherwise, the graph is displayed as is. Default is False.
color{None, ‘order’, ‘ages’, ‘freed’, ‘memoryincreases’, ‘memorydecreases’, ‘memorypressure’}, optional
Options to color nodes. colormap:
- None, the default, no colors.
- ‘order’, colors the nodes’ border based on the order they appear in the graph.
- ‘ages’, how long the data of a node is held.
- ‘freed’, the number of dependencies released after running a node.
- ‘memoryincreases’, how many more outputs are held after the lifetime of a node. Large values may indicate nodes that should have run later.
- ‘memorydecreases’, how many fewer outputs are held after the lifetime of a node. Large values may indicate nodes that should have run sooner.
- ‘memorypressure’, the number of data held when the node is run (circle), or the data is released (rectangle).
maxval{int, float}, optional
Maximum value for colormap to normalize form 0 to 1.0. Default is None
will make it the max number of values
collapse_outputsbool, optional
Whether to collapse output boxes, which often have empty labels. Default is False.
verbosebool, optional
Whether to label output and input boxes even if the data aren’t chunked. Beware: these labels can get very long. Default is False.
engine{“graphviz”, “ipycytoscape”, “cytoscape”}, optional.
The visualization engine to use. If not provided, this checks the dask config value “visualization.engine”. If that is not set, it tries to import graphviz
and ipycytoscape
, using the first one to succeed.
**kwargs
Additional keyword arguments to forward to the visualization engine.
Returns
resultIPython.display.Image, IPython.display.SVG, or None
See dask.dot.dot_graph for more information.
See also
dask.dot.dot_graph
Notes
For more information on optimization see here:
https://docs.dask.org/en/latest/optimize.html
Examples
x.visualize(filename='dask.pdf')
x.visualize(filename='dask.pdf', color='order')
Datasets¶
Dask has a few helpers for generating demo datasets
dask.datasets.make_people(npartitions=10, records_per_partition=1000, seed=None, locale='en')[source]¶
Make a dataset of random people
This makes a Dask Bag with dictionary records of randomly generated people. This requires the optional library mimesis
to generate records.
Parameters
npartitionsint
Number of partitions
records_per_partitionint
Number of records in each partition
seedint, (optional)
Random seed
localestr
Language locale, like ‘en’, ‘fr’, ‘zh’, or ‘ru’
Returns
b: Dask Bag
dask.datasets.timeseries(start='2000-01-01', end='2000-01-31', freq='1s', partition_freq='1D', dtypes=None, seed=None, **kwargs)[source]¶
Create timeseries dataframe with random data
Parameters
startdatetime (or datetime-like string)
Start of time series
enddatetime (or datetime-like string)
End of time series
dtypesdict (optional)
Mapping of column names to types. Valid types include {float, int, str, ‘category’}
freqstring
String like ‘2s’ or ‘1H’ or ‘12W’ for the time series frequency
partition_freqstring
String like ‘1M’ or ‘2Y’ to divide the dataframe into partitions
seedint (optional)
Randomstate seed
kwargs:
Keywords to pass down to individual column creation functions. Keywords should be prefixed by the column name and then an underscore.
Examples
import dask df = dask.datasets.timeseries() df.head()
timestamp id name x y 2000-01-01 00:00:00 967 Jerry -0.031348 -0.040633 2000-01-01 00:00:01 1066 Michael -0.262136 0.307107 2000-01-01 00:00:02 988 Wendy -0.526331 0.128641 2000-01-01 00:00:03 1016 Yvonne 0.620456 0.767270 2000-01-01 00:00:04 998 Ursula 0.684902 -0.463278 df = dask.datasets.timeseries( ... '2000', '2010', ... freq='2h', partition_freq='1D', seed=1, # data frequency ... dtypes={'value': float, 'name': str, 'id': int}, # data types ... id_lam=1000 # control number of items in id column ... )
Datasets with defined specs¶
The following helpers are still experimental:
dask.dataframe.io.demo.with_spec(spec: dask.dataframe.io.demo.DatasetSpec, seed: int | None = None)[source]¶
Generate a random dataset according to provided spec
Parameters
specDatasetSpec
Specify all the parameters of the dataset
seed: int (optional)
Randomstate seed
Notes
This API is still experimental, and will likely change in the future
Examples
from dask.dataframe.io.demo import ColumnSpec, DatasetSpec, with_spec ddf = with_spec( ... DatasetSpec( ... npartitions=10, ... nrecords=10_000, ... column_specs=[ ... ColumnSpec(dtype=int, number=2, prefix="p"), ... ColumnSpec(dtype=int, number=2, prefix="n", method="normal"), ... ColumnSpec(dtype=float, number=2, prefix="f"), ... ColumnSpec(dtype=str, prefix="s", number=2, random=True, length=10), ... ColumnSpec(dtype="category", prefix="c", choices=["Y", "N"]), ... ], ... ), seed=42) ddf.head(10)
p1 p2 n1 n2 f1 f2 s1 s2 c1 0 1002 972 -811 20 0.640846 -0.176875 L#h98#}J`? _8C607/:6e N 1 985 982 -1663 -777 0.790257 0.792796 u:XI3,omoZ w@ /d)'-@ N 2 947 970 799 -269 0.740869 -0.118413 O$dnwCuq\ !WtSe+(;#9 Y 3 1003 983 1133 521 -0.987459 0.278154 j+Qr_2{XG& &XV7cy$y1T Y 4 1017 1049 826 5 -0.875667 -0.744359 bJ3E-{:o {+jC).?vK+ Y 5 984 1017 -492 -399 0.748181 0.293761 ~zUNHNgD"! yuEkXeVot| Y 6 992 1027 -856 67 -0.125132 -0.234529 j.7z;o]Gc9 g|Fi5*}Y92 Y 7 1011 974 762 -1223 0.471696 0.937935 yT?jN/-u] JhEB[W-}^$ N 8 984 974 856 74 0.109963 0.367864 _j"&@ i&;/ OYXQ)w{hoH N 9 1030 1001 -792 -262 0.435587 -0.647970 Pmrwl{{|.K 3UTqM$86Sg N
The ColumnSpec
class¶
class dask.dataframe.io.demo.ColumnSpec(prefix: str | None = None, dtype: str | type | None = None, number: int = 1, nunique: int | None = None, choices: list = , low: int | None = None, high: int | None = None, length: int | None = None, random: bool = False, method: str | None = None, args: tuple[typing.Any, ...] = , kwargs: dict[str, typing.Any] = )[source]¶
Bases: object
Encapsulates properties of a family of columns with the same dtype. Different method can be specified for integer dtype (“poisson”, “uniform”, “binomial”, etc.)
Notes
This API is still experimental, and will likely change in the future
Args to pass into the method
For a “category” or str column, list of possible values
dtype_: str | type | None_ = None¶
Column data type. Only supports numpy dtypes
For an int column, high end of range
Any other kwargs to pass into the method
For a str or “category” column with random=True, how large a string to generate
Start value for an int column. Optional if random=True, since randint
doesn’t accept high and low.
For an int column, method to use when generating the value, such as “poisson”, “uniform”, “binomial”. Default “poisson”. Delegates to the same method of RandomState
How many columns to create with these properties. Default 1. If more than one columns are specified, they will be numbered: “int1”, “int2”, etc.
For a “category” column, how many unique categories to generate
Column prefix. If not specified, will default to str(dtype)
For an int column, whether to use randint
. For a string column produces a random string of specified length
The RangeIndexSpec
class¶
class dask.dataframe.io.demo.RangeIndexSpec(dtype: str | type = <class 'int'>, step: int = 1)[source]¶
Bases: object
Properties of the dataframe RangeIndex
Notes
This API is still experimental, and will likely change in the future
dtype¶
Index dtype
alias of int
Step for a RangeIndex
The DatetimeIndexSpec
class¶
class dask.dataframe.io.demo.DatetimeIndexSpec(dtype: str | type = <class 'int'>, start: str | None = None, freq: str = '1H', partition_freq: str | None = None)[source]¶
Bases: object
Properties of the dataframe DatetimeIndex
Notes
This API is still experimental, and will likely change in the future
dtype¶
Index dtype
alias of int
Frequency for the index (“1H”, “1D”, etc.)
partition_freq_: str | None_ = None¶
Partition frequency (“1D”, “1M”, etc.)
First value of the index
The DatasetSpec
class¶
class dask.dataframe.io.demo.DatasetSpec(npartitions: int = 1, nrecords: int = 1000, index_spec: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec = , column_specs: list[dask.dataframe.io.demo.ColumnSpec] = )[source]¶
Bases: object
Defines a dataset with random data, such as which columns and data types to generate
Notes
This API is still experimental, and will likely change in the future
column_specs_: list[dask.dataframe.io.demo.ColumnSpec]_¶
List of column definitions
index_spec_: dask.dataframe.io.demo.RangeIndexSpec | dask.dataframe.io.demo.DatetimeIndexSpec_¶
Properties of the index
How many partitions generate in the dataframe. If the dataframe has a DatetimeIndex, specify its partition_freq
instead
Total number of records to generate
Utilities¶
Dask has some public utility methods. These are primarily used for parsing configuration values.
dask.utils.apply(func, args, kwargs=None)[source]¶
Apply a function given its positional and keyword arguments.
Equivalent to func(*args, **kwargs)
Most Dask users will never need to use the apply
function. It is typically only used by people who need to inject keyword argument values into a low level Dask task graph.
Parameters
funccallable
The function you want to apply.
argstuple
A tuple containing all the positional arguments needed for func
(eg: (arg_1, arg_2, arg_3)
)
kwargsdict, optional
A dictionary mapping the keyword arguments (eg: {"kwarg_1": value, "kwarg_2": value}
Examples
from dask.utils import apply def add(number, second_number=5): ... return number + second_number ... apply(add, (10,), {"second_number": 2}) # equivalent to add(*args, **kwargs) 12
task = apply(add, (10,), {"second_number": 2}) dsk = {'task-name': task} # adds the task to a low level Dask task graph
dask.utils.format_bytes(n: int) → str[source]¶
Format bytes as text
from dask.utils import format_bytes format_bytes(1) '1 B' format_bytes(1234) '1.21 kiB' format_bytes(12345678) '11.77 MiB' format_bytes(1234567890) '1.15 GiB' format_bytes(1234567890000) '1.12 TiB' format_bytes(1234567890000000) '1.10 PiB'
For all values < 2**60, the output is always <= 10 characters.
dask.utils.format_time(n: float) → str[source]¶
format integers as time
from dask.utils import format_time format_time(1) '1.00 s' format_time(0.001234) '1.23 ms' format_time(0.00012345) '123.45 us' format_time(123.456) '123.46 s' format_time(1234.567) '20m 34s' format_time(12345.67) '3hr 25m' format_time(123456.78) '34hr 17m' format_time(1234567.89) '14d 6hr'
dask.utils.parse_bytes(s: float | str) → int[source]¶
Parse byte string to numbers
from dask.utils import parse_bytes parse_bytes('100') 100 parse_bytes('100 MB') 100000000 parse_bytes('100M') 100000000 parse_bytes('5kB') 5000 parse_bytes('5.4 kB') 5400 parse_bytes('1kiB') 1024 parse_bytes('1e6') 1000000 parse_bytes('1e6 kB') 1000000000 parse_bytes('MB') 1000000 parse_bytes(123) 123 parse_bytes('5 foos') Traceback (most recent call last): ... ValueError: Could not interpret 'foos' as a byte unit
dask.utils.parse_timedelta(s: None, default: str | Literal[False] = 'seconds') → None[source]¶
dask.utils.parse_timedelta(s: str | float | timedelta, default: str | Literal[False] = 'seconds') → float
Parse timedelta string to number of seconds
Parameters
sstr, float, timedelta, or None
default: str or False, optional
Unit of measure if s does not specify one. Defaults to seconds. Set to False to require s to explicitly specify its own unit.
Examples
from datetime import timedelta from dask.utils import parse_timedelta parse_timedelta('3s') 3 parse_timedelta('3.5 seconds') 3.5 parse_timedelta('300ms') 0.3 parse_timedelta(timedelta(seconds=3)) # also supports timedeltas 3