dask.dataframe.from_map — Dask documentation (original) (raw)

dask.dataframe.from_map#

dask.dataframe.from_map(func, *iterables, args=None, meta=<no_default>, divisions=None, label=None, enforce_metadata=False, **kwargs)[source]#

Create a DataFrame collection from a custom function map.

from_map is the preferred option when reading from data sources that are not natively supported by Dask or if the data source requires custom handling before handing things of to Dask DataFrames. Examples are things like binary files or other unstructured data that doesn’t have an IO connector.

from_map supports column projection by the optimizer. The optimizer tries to push column selections into the from_map call if the function supports a columns argument.

Parameters:

funccallable

Function used to create each partition. Column projection will be enabled if the function has a columns keyword argument.

*iterablesIterable objects

Iterable objects to map to each output partition. All iterables must be the same length. This length determines the number of partitions in the output collection (only one element of each iterable will be passed to func for each partition).

argslist or tuple, optional

Positional arguments to broadcast to each output partition. Note that these arguments will always be passed to func after theiterables positional arguments.

$META

divisionstuple, str, optional

Partition boundaries along the index. For tuple, see https://docs.dask.org/en/latest/dataframe-design.html#partitionsFor string ‘sorted’ will compute the delayed values to find index values. Assumes that the indexes are mutually sorted. If None, then won’t use index information

labelstr, optional

String to use as the function-name label in the output collection-key names.

tokenstr, optional

String to use as the “token” in the output collection-key names.

enforce_metadatabool, default True

Whether to enforce at runtime that the structure of the DataFrame produced by func actually matches the structure of meta. This will rename and reorder columns for each partition, and will raise an error if this doesn’t work, but it won’t raise if dtypes don’t match.

**kwargs:

Key-word arguments to broadcast to each output partition. These same arguments will be passed to func for every output partition.

Examples

import pandas as pd import dask.dataframe as dd func = lambda x, size=0: pd.Series([x] * size) inputs = ["A", "B"] dd.from_map(func, inputs, size=2).compute() 0 A 1 A 0 B 1 B dtype: string

The optimizer will identify a column selection that happens after from_map and push the columns argument into the actual map call to drop unnecessary columns as early as possible.

def map_function(x, columns=None): ... df = pd.DataFrame({"a": [1, 2], "b": x}) ... if columns is not None: ... df = df[columns] ... return df dd.from_map(map_function, [1, 2])["b"].compute() 0 1 1 1 0 2 1 2 Name: b, dtype: int64

This API can also be used as an alternative to other file-based IO functions, like read_csv (which are already justfrom_map wrapper functions):

import pandas as pd import dask.dataframe as dd paths = ["0.csv", "1.csv", "2.csv"] dd.from_map(pd.read_csv, paths).head() name timestamp 2000-01-01 00:00:00 Laura 2000-01-01 00:00:01 Oliver 2000-01-01 00:00:02 Alice 2000-01-01 00:00:03 Victor 2000-01-01 00:00:04 Bob

Since from_map allows you to map an arbitrary function to any number of iterable objects, it can be a very convenient means of implementing functionality that may be missing from other DataFrame-creation methods. For example, if you happen to have apriori knowledge about the number of rows in each of the files in a dataset, you can generate a DataFrame collection with a global RangeIndex:

import pandas as pd import numpy as np import dask.dataframe as dd paths = ["0.csv", "1.csv", "2.csv"] file_sizes = [86400, 86400, 86400] def func(path, row_offset): ... # Read parquet file and set RangeIndex offset ... df = pd.read_csv(path) ... return df.set_index( ... pd.RangeIndex(row_offset, row_offset+len(df)) ... ) def get_ddf(paths, file_sizes): ... offsets = [0] + list(np.cumsum(file_sizes)) ... return dd.from_map( ... func, paths, offsets[:-1], divisions=offsets ... ) ddf = get_ddf(paths, file_sizes) ddf.index Dask Index Structure: npartitions=3 0 int64 86400 ... 172800 ... 259200 ... dtype: int64 Dask Name: myfunc, 6 tasks