Python API (advanced) — Dask documentation (original) (raw)

Dynamic distributed task scheduler

The scheduler tracks the current state of workers, data, and computations. The scheduler listens for events and responds by controlling workers appropriately. It continuously tries to use the workers to execute an ever growing dask graph.

All events are handled quickly, in linear time with respect to their input (which is often of constant size) and generally within a millisecond. To accomplish this the scheduler tracks a lot of state. Every operation maintains the consistency of this state.

The scheduler communicates with the outside world through Comm objects. It maintains a consistent and valid view of the world even when listening to several clients at once.

A Scheduler is typically started either with the dask schedulerexecutable:

$ dask scheduler Scheduler started at 127.0.0.1:8786

Or within a LocalCluster a Client starts up without connection information:

c = Client()
c.cluster.scheduler
Scheduler(...)

Users typically do not interact with the scheduler directly but rather with the client object Client.

The contact_address parameter allows to advertise a specific address to the workers for communication with the scheduler, which is different than the address the scheduler binds to. This is useful when the scheduler listens on a private address, which therefore cannot be used by the workers to contact it.

State

The scheduler contains the following state variables. Each variable is listed along with what it stores and a brief description.

adaptive_target(target_duration: float | None = None) → int[source]

Desired number of workers based on the current workload

This looks at the current running tasks and memory use, and returns a number of desired workers. This is often used by adaptive scheduling.

Parameters

target_durationstr

A desired duration of time for computations to take. This affects how rapidly the scheduler will ask to scale.

async add_client(comm: distributed.comm.core.Comm, client: str, versions: dict[str, Any]) → None[source]

Add client to network

We listen to all future messages from this Comm.

add_keys(worker: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] = (), stimulus_id: str | None = None) → Literal['OK', 'not found'][source]

Learn that a worker has certain keys

This should not be used in practice and is mostly here for legacy reasons. However, it is sent by workers from time to time.

add_plugin(plugin: distributed.diagnostics.plugin.SchedulerPlugin, *, idempotent: bool = False, name: str | None = None, **kwargs: Any) → None[source]

Add external plugin to scheduler.

See https://distributed.readthedocs.io/en/latest/plugins.html

Parameters

pluginSchedulerPlugin

SchedulerPlugin instance to add

idempotentbool

If true, the plugin is assumed to already exist and no action is taken.

namestr

A name for the plugin, if None, the name attribute is checked on the Plugin instance and generated if not discovered.

async add_worker(comm: distributed.comm.core.Comm, *, address: str, status: str, server_id: str, nthreads: int, name: str, resolve_address: bool = True, now: float, resources: dict[str, float], host_info: None = None, memory_limit: int | None, metrics: dict[str, Any], pid: int = 0, services: dict[str, int], local_directory: str, versions: dict[str, Any], nanny: str, extra: dict, stimulus_id: str) → None[source]

Add a new worker to the cluster

async benchmark_hardware() → dict[str, dict[str, float]][source]

Run a benchmark on the workers for memory, disk, and network bandwidths

Returns

result: dict

A dictionary mapping the names “disk”, “memory”, and “network” to dictionaries mapping sizes to bandwidths. These bandwidths are averaged over many workers running computations across the cluster.

async broadcast(*, msg: dict, workers: collections.abc.Collection[str] | None = None, hosts: collections.abc.Collection[str] | None = None, nanny: bool = False, serializers: Any = None, on_error: Literal['raise', 'return', 'return_pickle', 'ignore'] = 'raise') → dict[str, Any][source]

Broadcast message to workers, return all results

client_heartbeat(client: str) → None[source]

Handle heartbeats from Client

client_releases_keys(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, stimulus_id: str | None = None) → None[source]

Remove keys from client desired list

client_send(client: str, msg: dict) → None[source]

Send message to client

async close(timeout: float | None = None, reason: str = 'unknown') → None[source]

Send cleanup signal to all coroutines then wait until finished

See also

Scheduler.cleanup

close_worker(worker: str) → None[source]

Ask a worker to shut itself down. Do not wait for it to take effect. Note that there is no guarantee that the worker will actually accept the command.

Note that remove_worker() sends the same command internally if close=True.

coerce_address(addr: str | tuple, resolve: bool = True) → str[source]

Coerce possible input addresses to canonical form.resolve can be disabled for testing with fake hostnames.

Handles strings, tuples, or aliases.

async delete_worker_data(worker_address: str, keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], stimulus_id: str) → None[source]

Delete data from a worker and update the corresponding worker/task states

Parameters

worker_address: str

Worker address to delete keys from

keys: list[Key]

List of keys to delete on the specified worker

async dump_cluster_state_to_url(url: str, exclude: collections.abc.Collection[str], format: Literal['msgpack', 'yaml'], **storage_options: dict[str, Any]) → None[source]

Write a cluster state dump to an fsspec-compatible URL.

async feed(comm: distributed.comm.core.Comm, function: bytes | None = None, setup: bytes | None = None, teardown: bytes | None = None, interval: str | float = '1s', **kwargs: Any) → None[source]

Provides a data Comm to external requester

Caution: this runs arbitrary Python code on the scheduler. This should eventually be phased out. It is mostly used by diagnostics.

async gather(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], serializers: list[str] | None = None) → dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], object][source]

Collect data from workers to the scheduler

async gather_on_worker(worker_address: str, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]]) → set[source]

Peer-to-peer copy of keys from multiple workers to a single worker

Parameters

worker_address: str

Recipient worker address to copy keys to

who_has: dict[Key, list[str]]

{key: [sender address, sender address, …], key: …}

Returns

returns:

set of keys that failed to be copied

async get_cluster_state(exclude: collections.abc.Collection[str]) → dict[source]

Produce the state dict used in a cluster state dump

async get_story(keys_or_stimuli: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]]) → list[distributed.scheduler.Transition][source]

RPC hook for SchedulerState.story().

Note that the msgpack serialization/deserialization round-trip will transform the Transition namedtuples into regular tuples.

get_worker_service_addr(worker: str, service_name: str, protocol: bool = False) → tuple[str, int] | str | None[source]

Get the (host, port) address of the named service on the worker. Returns None if the service doesn’t exist.

Parameters

workeraddress

service_namestr

Common services include ‘bokeh’ and ‘nanny’

protocolboolean

Whether or not to include a full address with protocol (True) or just a (host, port) pair

handle_long_running(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, run_id: int, compute_duration: float | None, stimulus_id: str) → None[source]

A task has seceded from the thread pool

We stop the task from being stolen in the future, and change task duration accounting as if the task has stopped.

handle_request_refresh_who_has(keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], worker: str, stimulus_id: str) → None[source]

Request from a Worker to refresh the who_has for some keys. Not to be confused with scheduler.who_has, which is a dedicated comm RPC request from a Client.

async handle_worker(comm: distributed.comm.core.Comm, worker: str) → None[source]

Listen to responses from a single worker

This is the main loop for scheduler-worker interaction

See also

Scheduler.handle_client

Equivalent coroutine for clients

identity(n_workers: int = - 1) → dict[str, Any][source]

Basic information about ourselves and our cluster

log_event(topic: str | collections.abc.Collection[str], msg: Any) → None[source]

Log an event under a given topic

Parameters

topicstr, list[str]

Name of the topic under which to log an event. To log the same event under multiple topics, pass a list of topic names.

msg

Event message to log. Note this must be msgpack serializable.

async proxy(msg: dict, worker: str, serializers: Any = None) → Any[source]

Proxy a communication through the scheduler to some other worker

async rebalance(keys: collections.abc.Iterable[Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]] | None = None, workers: collections.abc.Iterable[str] | None = None, stimulus_id: str | None = None) → dict[source]

Rebalance keys so that each worker ends up with roughly the same process memory (managed+unmanaged).

Warning

This operation is generally not well tested against normal operation of the scheduler. It is not recommended to use it while waiting on computations.

Algorithm

  1. Find the mean occupancy of the cluster, defined as data managed by dask + unmanaged process memory that has been there for at least 30 seconds (distributed.worker.memory.recent-to-old-time). This lets us ignore temporary spikes caused by task heap usage.
    Alternatively, you may change how memory is measured both for the individual workers as well as to calculate the mean throughdistributed.worker.memory.rebalance.measure. Namely, this can be useful to disregard inaccurate OS memory measurements.
  2. Discard workers whose occupancy is within 5% of the mean cluster occupancy (distributed.worker.memory.rebalance.sender-recipient-gap / 2). This helps avoid data from bouncing around the cluster repeatedly.
  3. Workers above the mean are senders; those below are recipients.
  4. Discard senders whose absolute occupancy is below 30% (distributed.worker.memory.rebalance.sender-min). In other words, no data is moved regardless of imbalancing as long as all workers are below 30%.
  5. Discard recipients whose absolute occupancy is above 60% (distributed.worker.memory.rebalance.recipient-max). Note that this threshold by default is the same asdistributed.worker.memory.target to prevent workers from accepting data and immediately spilling it out to disk.
  6. Iteratively pick the sender and recipient that are farthest from the mean and move the least recently inserted key between the two, until either all senders or all recipients fall within 5% of the mean.
    A recipient will be skipped if it already has a copy of the data. In other words, this method does not degrade replication. A key will be skipped if there are no recipients available with enough memory to accept the key and that don’t already hold a copy.

The least recently insertd (LRI) policy is a greedy choice with the advantage of being O(1), trivial to implement (it relies on python dict insertion-sorting) and hopefully good enough in most cases. Discarded alternative policies were:

Parameters

keys: optional

allowlist of dask keys that should be considered for moving. All other keys will be ignored. Note that this offers no guarantee that a key will actually be moved (e.g. because it is unnecessary or because there are no viable recipient workers for it).

workers: optional

allowlist of workers addresses to be considered as senders or recipients. All other workers will be ignored. The mean cluster occupancy will be calculated only using the allowed workers.

async register_nanny_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) → dict[str, distributed.core.OKMessage][source]

Registers a nanny plugin on all running and future nannies

async register_scheduler_plugin(plugin: bytes | distributed.diagnostics.plugin.SchedulerPlugin, name: str | None = None, idempotent: bool | None = None) → None[source]

Register a plugin on the scheduler.

async register_worker_plugin(comm: None, plugin: bytes, name: str, idempotent: bool | None = None) → dict[str, distributed.core.OKMessage][source]

Registers a worker plugin on all running and future workers

remove_client(client: str, stimulus_id: str | None = None) → None[source]

Remove client from network

remove_plugin(name: str | None = None) → None[source]

Remove external plugin from scheduler

Parameters

namestr

Name of the plugin to remove

remove_worker(address: str, *, stimulus_id: str, expected: bool = False, close: bool = True) → Literal['OK', 'already-removed'][source]

Remove worker from cluster.

We do this when a worker reports that it plans to leave or when it appears to be unresponsive. This may send its tasks back to a released state.

async replicate(keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], n: int | None = None, workers: collections.abc.Iterable | None = None, branching_factor: int = 2, delete: bool = True, stimulus_id: str | None = None) → dict | None[source]

Replicate data throughout cluster

This performs a tree copy of the data throughout the network individually on each piece of data.

Parameters

keys: Iterable

list of keys to replicate

n: int

Number of replications we expect to see within the cluster

branching_factor: int, optional

The number of workers that can copy data in each generation. The larger the branching factor, the more data we copy in a single step, but the more a given worker risks being swamped by data requests.

report(msg: dict, ts: distributed.scheduler.TaskState | None = None, client: str | None = None) → None[source]

Publish updates to all listening Queues and Comms

If the message contains a key then we only send the message to those comms that care about the key.

request_acquire_replicas(addr: str, keys: collections.abc.Iterable[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) → None[source]

Asynchronously ask a worker to acquire a replica of the listed keys from other workers. This is a fire-and-forget operation which offers no feedback for success or failure, and is intended for housekeeping and not for computation.

request_remove_replicas(addr: str, keys: list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], *, stimulus_id: str) → None[source]

Asynchronously ask a worker to discard its replica of the listed keys. This must never be used to destroy the last replica of a key. This is a fire-and-forget operation, intended for housekeeping and not for computation.

The replica disappears immediately from TaskState.who_has on the Scheduler side; if the worker refuses to delete, e.g. because the task is a dependency of another task running on it, it will (also asynchronously) inform the scheduler to re-add itself to who_has. If the worker agrees to discard the task, there is no feedback.

async restart(*, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, stimulus_id: str) → None[source]

Forget all tasks and call restart_workers on all workers.

Parameters

timeout:

See restart_workers

wait_for_workers:

See restart_workers

async restart_workers(workers: list[str] | None = None, *, client: str | None = None, timeout: float = 30, wait_for_workers: bool = True, on_error: Literal['raise', 'return'] = 'raise', stimulus_id: str) → dict[str, Literal['OK', 'removed', 'timed out']][source]

Restart selected workers. Optionally wait for workers to return.

Workers without nannies are shut down, hoping an external deployment system will restart them. Therefore, if not using nannies and your deployment system does not automatically restart workers, restart will just shut down all workers, then time out!

After restart, all connected workers are new, regardless of whetherTimeoutError was raised. Any workers that failed to shut down in time are removed, and may or may not shut down on their own in the future.

Parameters

workers:

List of worker addresses to restart. If omitted, restart all workers.

timeout:

How long to wait for workers to shut down and come back, if wait_for_workersis True, otherwise just how long to wait for workers to shut down. Raises asyncio.TimeoutError if this is exceeded.

wait_for_workers:

Whether to wait for all workers to reconnect, or just for them to shut down (default True). Use restart(wait_for_workers=False) combined withClient.wait_for_workers() for granular control over how many workers to wait for.

on_error:

If ‘raise’ (the default), raise if any nanny times out while restarting the worker. If ‘return’, return error messages.

Returns

{worker address: “OK”, “no nanny”, or “timed out” or error message}

async retire_workers(workers: list[str], *, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') → dict[str, Any][source]

async retire_workers(*, names: list, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None') → dict[str, Any]

async retire_workers(*, close_workers: bool = 'False', remove: bool = 'True', stimulus_id: str | None = 'None', memory_ratio: int | float | None = 'None', n: int | None = 'None', key: Callable[[WorkerState], Hashable] | bytes | None = 'None', minimum: int | None = 'None', target: int | None = 'None', attribute: str = "'address'") → dict[str, Any]

Gracefully retire workers from cluster. Any key that is in memory exclusively on the retired workers is replicated somewhere else.

Parameters

workers: list[str] (optional)

List of worker addresses to retire.

names: list (optional)

List of worker names to retire. Mutually exclusive with workers. If neither workers nor names are provided, we callworkers_to_close which finds a good set.

close_workers: bool (defaults to False)

Whether to actually close the worker explicitly from here. Otherwise, we expect some external job scheduler to finish off the worker.

remove: bool (defaults to True)

Whether to remove the worker metadata immediately or else wait for the worker to contact us.

If close_workers=False and remove=False, this method just flushes the tasks in memory out of the workers and then returns. If close_workers=True and remove=False, this method will return while the workers are still in the cluster, although they won’t accept new tasks. If close_workers=False or for whatever reason a worker doesn’t accept the close command, it will be left permanently unable to accept new tasks and it is expected to be closed in some other way.

**kwargs: dict

Extra options to pass to workers_to_close to determine which workers we should drop. Only accepted if workers and names are omitted.

Returns

Dictionary mapping worker ID/address to dictionary of information about

that worker for each retired worker.

If there are keys that exist in memory only on the workers being retired and it

was impossible to replicate them somewhere else (e.g. because there aren’t

any other running workers), the workers holding such keys won’t be retired and

won’t appear in the returned dict.

run_function(comm: distributed.comm.core.Comm, function: collections.abc.Callable, args: tuple = (), kwargs: dict | None = None, wait: bool = True) → Any[source]

Run a function within this process

async scatter(data: dict, workers: collections.abc.Iterable | None, client: str, broadcast: bool = False, timeout: float = 2) → list[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]][source]

Send data out to workers

send_all(client_msgs: dict[str, list[dict[str, Any]]], worker_msgs: dict[str, list[dict[str, Any]]]) → None[source]

Send messages to client and workers

send_task_to_worker(worker: str, ts: distributed.scheduler.TaskState) → None[source]

Send a single computational task to a worker

async start_unsafe() → Self[source]

Clear out old state and restart all running coroutines

stimulus_cancel(keys: collections.abc.Collection[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]]], client: str, force: bool, reason: str, msg: str) → None[source]

Stop execution on a list of keys

stimulus_queue_slots_maybe_opened(*, stimulus_id: str) → None[source]

Respond to an event which may have opened spots on worker threadpools

Selects the appropriate number of tasks from the front of the queue according to the total number of task slots available on workers (potentially 0), and transitions them to processing.

Notes

Other transitions related to this stimulus should be fully processed beforehand, so any tasks that became runnable are already in processing. Otherwise, overproduction can occur if queued tasks get scheduled before downstream tasks.

Must be called after check_idle_saturated; i.e. idle_task_count must be up to date.

stimulus_task_erred(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, exception: Any, stimulus_id: str, traceback: Any, run_id: str, **kwargs: Any) → tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][source]

Mark that a task has erred on a particular worker

stimulus_task_finished(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], worker: str, stimulus_id: str, run_id: int, **kwargs: Any) → tuple[dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], dict[str, list[dict[str, typing.Any]]], dict[str, list[dict[str, typing.Any]]]][source]

Mark that a task has finished execution on a particular worker

transition(key: Union[str, int, float, tuple[ForwardRef('Key'), ...]], finish: Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten'], stimulus_id: str, **kwargs: Any) → dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']][source]

Transition a key from its current state to the finish state

Returns

Dictionary of recommendations for future transitions

See also

Scheduler.transitions

transitive version of this function

Examples

self.transition('x', 'waiting') {'x': 'processing'}

transitions(recommendations: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], typing.Literal['released', 'waiting', 'no-worker', 'queued', 'processing', 'memory', 'erred', 'forgotten']], stimulus_id: str) → None[source]

Process transitions until none are left

This includes feedback from previous transitions and continues until we reach a steady state

async unregister_nanny_plugin(comm: None, name: str) → dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][source]

Unregisters a worker plugin

async unregister_scheduler_plugin(name: str) → None[source]

Unregister a plugin on the scheduler.

async unregister_worker_plugin(comm: None, name: str) → dict[str, distributed.core.ErrorMessage | distributed.core.OKMessage][source]

Unregisters a worker plugin

update_data(*, who_has: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], list[str]], nbytes: dict[typing.Union[str, int, float, tuple[typing.Union[str, int, float, tuple[ForwardRef('Key'), ...]], ...]], int], client: str | None = None) → None[source]

Learn that new data has entered the network from an external source

worker_send(worker: str, msg: dict[str, Any]) → None[source]

Send message to worker

This also handles connection failures by adding a callback to remove the worker on the next cycle.

workers_list(workers: collections.abc.Iterable[str] | None) → list[str][source]

List of qualifying workers

Takes a list of worker addresses or hostnames. Returns a list of all worker addresses that match

workers_to_close(memory_ratio: int | float | None = None, n: int | None = None, key: collections.abc.Callable[[distributed.scheduler.WorkerState], collections.abc.Hashable] | bytes | None = None, minimum: int | None = None, target: int | None = None, attribute: str = 'address') → list[str][source]

Find workers that we can close with low cost

This returns a list of workers that are good candidates to retire. These workers are not running anything and are storing relatively little data relative to their peers. If all workers are idle then we still maintain enough workers to have enough RAM to store our data, with a comfortable buffer.

This is for use with systems like distributed.deploy.adaptive.

Parameters

memory_ratioNumber

Amount of extra space we want to have for our stored data. Defaults to 2, or that we want to have twice as much memory as we currently have data.

nint

Number of workers to close

minimumint

Minimum number of workers to keep around

keyCallable(WorkerState)

An optional callable mapping a WorkerState object to a group affiliation. Groups will be closed together. This is useful when closing workers must be done collectively, such as by hostname.

targetint

Target number of workers to have after we close

attributestr

The attribute of the WorkerState object to return, like “address” or “name”. Defaults to “address”.

Returns

to_close: list of worker addresses that are OK to close

Examples

scheduler.workers_to_close() ['tcp://192.168.0.1:1234', 'tcp://192.168.0.2:1234']

Group workers by hostname prior to closing

scheduler.workers_to_close(key=lambda ws: ws.host) ['tcp://192.168.0.1:1234', 'tcp://192.168.0.1:4567']

Remove two workers

scheduler.workers_to_close(n=2)

Keep enough workers to have twice as much memory as we we need.

scheduler.workers_to_close(memory_ratio=2)