Customize Initialization — Dask documentation (original) (raw)

Contents

Contents

Customize Initialization

Often we want to run custom code when we start up or tear down a scheduler or worker. We might do this manually with functions like Client.run orClient.run_on_scheduler, but this is error prone and difficult to automate.

To resolve this, Dask includes a few mechanisms to run arbitrary code around the lifecycle of a Scheduler, Worker, Nanny, or Client.

Preload Scripts

Both dask-scheduler and dask-worker support a --preload option that allows custom initialization of each scheduler/worker respectively. A module or Python file passed as a --preload value is guaranteed to be imported before establishing any connection. A dask_setup(service) function is called if found, with a Scheduler, Worker, Nanny, or Client instance as the argument. As the service stops, dask_teardown(service) is called if present.

To support additional configuration, a single --preload module may register additional command-line arguments by exposing dask_setup as a Clickcommand. This command will be used to parse additional arguments provided todask-worker or dask-scheduler and will be called before service initialization.

Example

As an example, consider the following file that creates ascheduler pluginand registers it with the scheduler

scheduler-setup.py

import click

from distributed.diagnostics.plugin import SchedulerPlugin

class MyPlugin(SchedulerPlugin): def init(self, print_count): self.print_count = print_count super().init()

def add_worker(self, scheduler=None, worker=None, **kwargs):
    print("Added a new worker at:", worker)
    if self.print_count and scheduler is not None:
        print("Total workers:", len(scheduler.workers))

@click.command() @click.option("--print-count/--no-print-count", default=False) def dask_setup(scheduler, print_count): plugin = MyPlugin(print_count) scheduler.add_plugin(plugin)

We can then run this preload script by referring to its filename (or module name if it is on the path) when we start the scheduler:

dask-scheduler --preload scheduler-setup.py --print-count

Types

Preloads can be specified as any of the following forms:

Configuration

Preloads can also be registered with configuration at the following values:

distributed: scheduler: preload: - "import os; os.environ['A'] = 'b'" # use Python text - /path/to/myfile.py # or a filename - my_module # or a module name preload-argv: - [] # Pass optional keywords - ["--option", "value"] - [] worker: preload: [] preload-argv: [] nanny: preload: [] preload-argv: [] client: preload: [] preload-argv: []

Note

Because the dask-worker command needs to accept keywords for both the Worker and the Nanny (if a nanny is used) it has both a --preload and--preload-nanny keyword. All extra keywords (like --print-countabove) will be sent to the workers rather than the nanny. There is no way to specify extra keywords to the nanny preload scripts on the command line. We recommend the use of the more flexible configuration if this is necessary.

Worker Lifecycle Plugins

You can also create a class with setup, teardown, and transition methods, and register that class with the scheduler to give to every worker using theClient.register_worker_plugin method.

Client.register_worker_plugin(plugin[, ...]) Registers a lifecycle worker plugin for all current and future workers.

Client.register_worker_plugin(plugin: distributed.diagnostics.plugin.NannyPlugin | distributed.diagnostics.plugin.WorkerPlugin, name: str | None = None, nanny: bool | None = None)[source]

Registers a lifecycle worker plugin for all current and future workers.

This registers a new object to handle setup, task state transitions and teardown for workers in this cluster. The plugin will instantiate itself on all currently connected workers. It will also be run on any worker that connects in the future.

The plugin may include methods setup, teardown, transition, and release_key. See thedask.distributed.WorkerPlugin class or the examples below for the interface and docstrings. It must be serializable with the pickle or cloudpickle modules.

If the plugin has a name attribute, or if the name= keyword is used then that will control idempotency. If a plugin with that name has already been registered, then it will be removed and replaced by the new one.

For alternatives to plugins, you may also wish to look into preload scripts.

Parameters

pluginWorkerPlugin or NannyPlugin

WorkerPlugin or NannyPlugin instance to register.

namestr, optional

A name for the plugin. Registering a plugin with the same name will have no effect. If plugin has no name attribute a random name is used.

nannybool, optional

Whether to register the plugin with workers or nannies.

Examples

class MyPlugin(WorkerPlugin): ... def init(self, *args, **kwargs): ... pass # the constructor is up to you ... def setup(self, worker: dask.distributed.Worker): ... pass ... def teardown(self, worker: dask.distributed.Worker): ... pass ... def transition(self, key: str, start: str, finish: str, ... **kwargs): ... pass ... def release_key(self, key: str, state: str, cause: str | None, reason: None, report: bool): ... pass

plugin = MyPlugin(1, 2, 3) client.register_plugin(plugin)

You can get access to the plugin with the get_worker function

client.register_plugin(other_plugin, name='my-plugin') def f(): ... worker = get_worker() ... plugin = worker.plugins['my-plugin'] ... return plugin.my_state

future = client.run(f)