Python API — Dask documentation (original) (raw)
Python API¶
You can create a dask.distributed
scheduler by importing and creating aClient
with no arguments. This overrides whatever default was previously set.
from dask.distributed import Client client = Client()
You can navigate to http://localhost:8787/status
to see the diagnostic dashboard if you have Bokeh installed.
Client¶
You can trivially set up a local cluster on your machine by instantiating a Dask Client with no arguments
from dask.distributed import Client client = Client()
This sets up a scheduler in your local process along with a number of workers and threads per worker related to the number of cores in your machine.
If you want to run workers in your same process, you can pass theprocesses=False
keyword argument.
client = Client(processes=False)
This is sometimes preferable if you want to avoid inter-worker communication and your computations release the GIL. This is common when primarily using NumPy or Dask Array.
LocalCluster¶
The Client()
call described above is shorthand for creating a LocalCluster and then passing that to your client.
from dask.distributed import Client, LocalCluster cluster = LocalCluster() client = Client(cluster)
This is equivalent, but somewhat more explicit.
You may want to look at the keyword arguments available on LocalCluster
to understand the options available to you on handling the mixture of threads and processes, like specifying explicit ports, and so on.
To create a local cluster with all workers running in dedicated subprocesses,dask.distributed
also offers the experimental SubprocessCluster
.
Cluster manager features¶
Instantiating a cluster manager class like LocalCluster
and then passing it to theClient
is a common pattern. Cluster managers also provide useful utilities to help you understand what is going on.
For example you can retrieve the Dashboard URL.
cluster.dashboard_link 'http://127.0.0.1:8787/status'
You can retrieve logs from cluster components.
cluster.get_logs() {'Cluster': '', 'Scheduler': "distributed.scheduler - INFO - Clear task state\ndistributed.scheduler - INFO - S...
If you are using a cluster manager that supports scaling you can modify the number of workers manually or automatically based on workload.
cluster.scale(10) # Sets the number of workers to 10
cluster.adapt(minimum=1, maximum=10) # Allows the cluster to auto scale to 10 when tasks are computed
Reference¶
class distributed.deploy.local.LocalCluster(name=None, n_workers=None, threads_per_worker=None, processes=None, loop=None, start=None, host=None, ip=None, scheduler_port=0, silence_logs=30, dashboard_address=':8787', worker_dashboard_address=None, diagnostics_port=None, services=None, worker_services=None, service_kwargs=None, asynchronous=False, security=None, protocol=None, blocked_handlers=None, interface=None, worker_class=None, scheduler_kwargs=None, scheduler_sync_interval=1, **worker_kwargs)[source]¶
Create local Scheduler and Workers
This creates a “cluster” of a scheduler and workers running on the local machine.
Parameters
n_workers: int
Number of workers to start
memory_limit: str, float, int, or None, default “auto”
Sets the memory limit per worker.
Notes regarding argument data type:
- If None or 0, no limit is applied.
- If “auto”, the total system memory is split evenly between the workers.
- If a float, that fraction of the system memory is used per worker.
- If a string giving a number of bytes (like
"1GiB"
), that amount is used per worker. - If an int, that number of bytes is used per worker.
Note that the limit will only be enforced when processes=True
, and the limit is only enforced on a best-effort basis — it’s still possible for workers to exceed this limit.
processes: bool
Whether to use processes (True) or threads (False). Defaults to True, unless worker_class=Worker, in which case it defaults to False.
threads_per_worker: int
Number of threads per each worker
scheduler_port: int
Port of the scheduler. Use 0 to choose a random port (default). 8786 is a common choice.
silence_logs: logging level
Level of logs to print out to stdout. logging.WARN
by default. Use a falsey value like False or None for no change.
host: string
Host address on which the scheduler will listen, defaults to only localhost
ip: string
Deprecated. See host
above.
dashboard_address: str
Address on which to listen for the Bokeh diagnostics server like ‘localhost:8787’ or ‘0.0.0.0:8787’. Defaults to ‘:8787’. Set to None
to disable the dashboard. Use ‘:0’ for a random port. When specifying only a port like ‘:8787’, the dashboard will bind to the given interface from the host
parameter. If host
is empty, binding will occur on all interfaces ‘0.0.0.0’. To avoid firewall issues when deploying locally, set host
to ‘localhost’.
worker_dashboard_address: str
Address on which to listen for the Bokeh worker diagnostics server like ‘localhost:8787’ or ‘0.0.0.0:8787’. Defaults to None which disables the dashboard. Use ‘:0’ for a random port.
diagnostics_port: int
Deprecated. See dashboard_address.
asynchronous: bool (False by default)
Set to True if using this cluster within async/await functions or within Tornado gen.coroutines. This should remain False for normal use.
blocked_handlers: List[str]
A list of strings specifying a blocklist of handlers to disallow on the Scheduler, like ['feed', 'run_function']
service_kwargs: Dict[str, Dict]
Extra keywords to hand to the running services
securitySecurity or bool, optional
Configures communication security in this cluster. Can be a security object, or True. If True, temporary self-signed credentials will be created automatically.
protocol: str (optional)
Protocol to use like tcp://
, tls://
, inproc://
This defaults to sensible choice given other keyword arguments likeprocesses
and security
interface: str (optional)
Network interface to use. Defaults to lo/localhost
worker_class: Worker
Worker class used to instantiate workers from. Defaults to Worker if processes=False and Nanny if processes=True or omitted.
**worker_kwargs:
Extra worker arguments. Any additional keyword arguments will be passed to the Worker
class constructor.
Examples
cluster = LocalCluster() # Create a local cluster
cluster
LocalCluster("127.0.0.1:8786", workers=8, threads=8)
c = Client(cluster) # connect to local cluster
Scale the cluster to three workers
Pass extra keyword arguments to Bokeh
LocalCluster(service_kwargs={'dashboard': {'prefix': '/foo'}})