Rendezvous — PyTorch 2.7 documentation (original) (raw)

In the context of Torch Distributed Elastic we use the term rendezvous to refer to a particular functionality that combines a distributed synchronization primitive with peer discovery.

It is used by Torch Distributed Elastic to gather participants of a training job (i.e. nodes) such that they all agree on the same list of participants and everyone’s roles, as well as make a consistent collective decision on when training can begin/resume.

Torch Distributed Elastic rendezvous provides the following critical functionalities:

Barrier:

Nodes performing rendezvous will all block until the rendezvous is considered complete - this happens when at least min total number of nodes have joined the rendezvous barrier (for the same job). This also implies the barrier is not necessarily of fixed size.

There’s an additional small waiting time after reaching min number of nodes - this is used to ensure the rendezvous is not completed “too quickly” (which could potentially exclude additional nodes attempting to join at approximately the same time).

If max number of nodes is gathered at the barrier, the rendezvous is completed immediately.

There’s also an overall timeout which causes the rendezvous to fail if minnumber of nodes is never reached - this is meant to be a simple fail-safe to help release partially allocated job resources, in case there’s a problem with the resource manager, and is meant to be interpreted as non-retryable.

Exclusivity:

A simple distributed barrier would not be sufficient, as we also need to ensure that only one group of nodes exists at any given time (for a given job). In other words, new nodes (i.e. joining late) should not be able to form a parallel independent group of workers for the same job.

Torch Distributed Elastic rendezvous ensures that if a group of nodes has already completed a rendezvous (and hence might already be training), then additional “late” nodes attempting to rendezvous will only announce themselves as waiting, and will have to wait until the (previously completed) existing rendezvous is destroyed first.

Consistency:

When a rendezvous is completed, all its members will agree on the job membership and everyone’s role in it. This role is represented using an integer, called rank, that is between between 0 and world size.

Note that ranks are not stable, in the sense that the same node can be assigned a different rank in the next (re-)rendezvous.

Fault-tolerance:

Torch Distributed Elastic rendezvous is designed to tolerate node failures during the rendezvous process. Should a process crash (or lose network connectivity, etc), between joining the rendezvous and it being completed, then a re-rendezvous with remaining healthy nodes will happen automatically.

A node can also fail after it has completed (or has been observered by other nodes to have completed) the rendezvous - this scenario will be handled by the Torch Distributed Elastic train_loop instead (where it will also trigger a re-rendezvous).

Shared key-value store:

When the rendezvous is completed, a shared key-value store is created and returned. This store implements a torch.distributed.Store API (seedistributed communication docs).

This store is only shared by the members of the completed rendezvous. It is intended to be used by Torch Distributed Elastic to exchange information necessary to initialize job control and data-planes.

Waiting workers and rendezvous closing:

Torch Distributed Elastic rendezvous handler object provides additional functionalities, which are technically not part of the rendezvous process:

  1. Querying how many workers arrived late at the barrier, who can participate in_next_ rendezvous.
  2. Setting the rendezvous closed to signal all nodes not to participate in next rendezvous.

DynamicRendezvousHandler:

Torch Distributed Elastic comes with the DynamicRendezvousHandlerclass that implements the rendezvous mechanism described above. It is a backend- agnostic type that expects a particular RendezvousBackend instance to be specified during construction.

Torch distributed users can either implement their own backend type or use one of the following implementations that come with PyTorch:

Below is a state diagram describing how rendezvous works.

../_images/etcd_rdzv_diagram.png

Registry

class torch.distributed.elastic.rendezvous.RendezvousParameters(backend, endpoint, run_id, min_nodes, max_nodes, local_addr=None, **kwargs)[source][source]

Hold the parameters to construct a RendezvousHandler.

Parameters

get(key, default=None)[source][source]

Return the value for key if key exists, else default.

Return type

Any

get_as_bool(key, default=None)[source][source]

Return the value for key as a bool.

Return type

Optional[bool]

get_as_int(key, default=None)[source][source]

Return the value for key as an int.

Return type

Optional[int]

class torch.distributed.elastic.rendezvous.RendezvousHandlerRegistry[source][source]

Represent a registry of RendezvousHandler backends.

Handler

class torch.distributed.elastic.rendezvous.RendezvousHandler[source][source]

Main rendezvous interface.

Note

Distributed Torch users normally do not need to implement their ownRendezvousHandler. An implementation based on C10d Store is already provided, and is recommended for most users.

abstract get_backend()[source][source]

Return the name of the rendezvous backend.

Return type

str

abstract get_run_id()[source][source]

Return the run id of the rendezvous.

The run id is a user-defined id that uniquely identifies an instance of a distributed application. It typically maps to a job id and is used to allow nodes to join the correct distributed application.

Return type

str

abstract is_closed()[source][source]

Check whether the rendezvous has been closed.

A closed rendezvous means all future attempts to re-rendezvous within same job will fail.

is_closed() and set_closed() have semantics of eventual propagation and should not be used for synchronization. The intention is that if at least one node decides the job is finished, it will close the rendezvous, and other nodes will soon observe this and stop running as well.

Return type

bool

abstract next_rendezvous()[source][source]

Main entry-point into the rendezvous barrier.

Blocks until the rendezvous is complete and the current process is included in the formed worker group, or a timeout occurs, or the rendezvous was marked closed.

Returns

Instance of RendezvousInfo.

Raises

Return type

RendezvousInfo

abstract num_nodes_waiting()[source][source]

Return the number of nodes who arrived late at the rendezvous barrier, hence were not included in the current worker group.

Callers should periodically call this method to check whether new nodes are waiting to join the job and if so admit them by callingnext_rendezvous() (re-rendezvous).

Return type

int

abstract set_closed()[source][source]

Mark the rendezvous as closed.

abstract shutdown()[source][source]

Close all resources that were open for the rendezvous.

Example:

rdzv_handler = ... try: store, rank, world_size = rdzv_handler.next_rendezvous() finally: rdzv_handler.shutdown()

Return type

bool

property use_agent_store_: bool_

Indicates that store reference returned by next_rendezvous() can be shared with user applications and will be available during application lifecyle.

Rendezous handler impl will share store details as instance of RendezvousStoreInfo. Applications as a convention use MASTER_ADDR/MASTER_PORT env variables to lookup the store.

Dataclasses

class torch.distributed.elastic.rendezvous.RendezvousInfo(store, rank, world_size, bootstrap_store_info)[source][source]

Holds the information about the rendezvous.

class torch.distributed.elastic.rendezvous.api.RendezvousStoreInfo(master_addr, master_port)[source][source]

Store address and port that can be used to bootstrap trainer distributed comms

static build(rank, store)[source][source]

Factory method, finds unused new port on rank0 host and addr/port info with all ranks.

If master_addr/master_port is knowns (useful when sharing existing tcp store server) use the constructor.

Parameters

Return type

RendezvousStoreInfo

Exceptions

class torch.distributed.elastic.rendezvous.api.RendezvousError[source][source]

Represents the base type for rendezvous errors.

class torch.distributed.elastic.rendezvous.api.RendezvousClosedError[source][source]

Raised when a rendezvous is closed.

class torch.distributed.elastic.rendezvous.api.RendezvousTimeoutError[source][source]

Raised when a rendezvous did not complete on time.

class torch.distributed.elastic.rendezvous.api.RendezvousConnectionError[source][source]

Raised when the connection to a rendezvous backend has failed.

class torch.distributed.elastic.rendezvous.api.RendezvousStateError[source][source]

Raised when the state of a rendezvous is corrupt.

class torch.distributed.elastic.rendezvous.api.RendezvousGracefulExitError[source][source]

Raised when node wasn’t not included in rendezvous and gracefully exits.

Exception is a mechanism to exit the stack, however does not mean a failure.

Implementations

Dynamic Rendezvous

torch.distributed.elastic.rendezvous.dynamic_rendezvous.create_handler(store, backend, params)[source][source]

Create a new DynamicRendezvousHandler from the specified parameters.

Parameters

Return type

DynamicRendezvousHandler

Parameter Description
join_timeout The total time, in seconds, within which the rendezvous is expected to complete. Defaults to 600 seconds.
last_call_timeout An additional wait amount, in seconds, before completing the rendezvous once the minimum number of nodes has been reached. Defaults to 30 seconds.
close_timeout The time, in seconds, within which the rendezvous is expected to close after a call toRendezvousHandler.set_closed() orRendezvousHandler.shutdown(). Defaults to 30 seconds.
heartbeat The time, in seconds, within which a keep-alive heartbeat is expected to complete

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.DynamicRendezvousHandler[source][source]

Represent a handler that sets up a rendezvous among a set of nodes.

classmethod from_backend(run_id, store, backend, min_nodes, max_nodes, local_addr=None, timeout=None, keep_alive_interval=5, keep_alive_max_attempt=3)[source][source]

Create a new DynamicRendezvousHandler.

Parameters

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousBackend[source][source]

Represent a backend that holds the rendezvous state.

abstract get_state()[source][source]

Get the rendezvous state.

Returns

A tuple of the encoded rendezvous state and its fencing token orNone if no state is found in the backend.

Raises

Return type

Optional[tuple[bytes, Any]]

abstract property name_: str_

Get the name of the backend.

abstract set_state(state, token=None)[source][source]

Set the rendezvous state.

The new rendezvous state is set conditionally:

Parameters

Returns

A tuple of the serialized rendezvous state, its fencing token, and a boolean value indicating whether our set attempt succeeded.

Raises

Return type

Optional[tuple[bytes, Any, bool]]

class torch.distributed.elastic.rendezvous.dynamic_rendezvous.RendezvousTimeout(join=None, last_call=None, close=None, heartbeat=None)[source][source]

Hold the timeout configuration of a rendezvous.

Parameters

property close_: timedelta_

Get the close timeout.

property heartbeat_: timedelta_

Get the keep-alive heartbeat timeout.

property join_: timedelta_

Get the join timeout.

property last_call_: timedelta_

Get the last call timeout.

C10d Backend

torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.create_backend(params)[source][source]

Create a new C10dRendezvousBackend from the specified parameters.

Parameter Description
store_type The type of the C10d store. The currently supported types are “tcp” and “file” which correspond totorch.distributed.TCPStore andtorch.distributed.FileStore, respectively. Defaults to “tcp”.
read_timeout The read timeout, in seconds, for store operations. Defaults to 60 seconds. Note this only applies totorch.distributed.TCPStore. It is not relevant to torch.distributed.FileStore which does not take in timeout as a parameter.
is_host A boolean value indicating whether this backend instance will host the C10d store. If not specified it will be inferred heuristically by matching the hostname or the IP address of this machine against the specified rendezvous endpoint. Defaults to None. Note that this configuration option only applies totorch.distributed.TCPStore. In normal circumstances you can safely skip it; the only time when it is needed is if its value cannot be correctly determined (e.g. the rendezvous endpoint has a CNAME as the hostname or does not match the FQDN of the machine).

Return type

tuple[torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.c10d_rendezvous_backend.C10dRendezvousBackend(store, run_id)[source][source]

Represents a C10d-backed rendezvous backend.

Parameters

get_state()[source][source]

See base class.

Return type

Optional[tuple[bytes, Any]]

property name_: str_

See base class.

set_state(state, token=None)[source][source]

See base class.

Return type

Optional[tuple[bytes, Any, bool]]

Etcd Backend

torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.create_backend(params)[source][source]

Create a new EtcdRendezvousBackend from the specified parameters.

Parameter Description
read_timeout The read timeout, in seconds, for etcd operations. Defaults to 60 seconds.
protocol The protocol to use to communicate with etcd. Valid values are “http” and “https”. Defaults to “http”.
ssl_cert The path to the SSL client certificate to use along with HTTPS. Defaults to None.
ssl_cert_key The path to the private key of the SSL client certificate to use along with HTTPS. Defaults to None.
ca_cert The path to the rool SSL authority certificate. Defaults to None.

Return type

tuple[torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend, torch.distributed.distributed_c10d.Store]

class torch.distributed.elastic.rendezvous.etcd_rendezvous_backend.EtcdRendezvousBackend(client, run_id, key_prefix=None, ttl=None)[source][source]

Represents an etcd-based rendezvous backend.

Parameters

get_state()[source][source]

See base class.

Return type

Optional[tuple[bytes, Any]]

property name_: str_

See base class.

set_state(state, token=None)[source][source]

See base class.

Return type

Optional[tuple[bytes, Any, bool]]

Etcd Rendezvous (Legacy)

Warning

The DynamicRendezvousHandler class supersedes the EtcdRendezvousHandlerclass, and is recommended for most users. EtcdRendezvousHandler is in maintenance mode and will be deprecated in the future.

class torch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvousHandler(rdzv_impl, local_addr)[source][source]

Implements atorch.distributed.elastic.rendezvous.RendezvousHandler interface backed bytorch.distributed.elastic.rendezvous.etcd_rendezvous.EtcdRendezvous.EtcdRendezvousHandler uses a URL to configure the type of rendezvous to use and to pass implementation specific configurations to the rendezvous module. The basic etcd rendezvous configuration URL looks like the following

etcd://:/?min_workers=&max_workers= # noqa: W605

-- example --

etcd://localhost:2379/1234?min_workers=1&max_workers=3

The URL above is interpreted as follows:

  1. Use the rendezvous handler that is registered with the etcdscheme
  2. The etcd endpoint to use is localhost:2379
  3. job_id == 1234 is used as the prefix in etcd (this allows one to share a common etcd server for multiple jobs so long as thejob_ids are guaranteed to be unique). Note that the job id can be any string (e.g. does not need to be a number) as long as it is unique.
  4. min_workers=1 and max_workers=3 specifies a range for membership size - Torch Distributed Elastic starts running the job as long as the cluster size is greater than or equal to min_workersand admits up to max_workers into the cluster.

Below are a full list of the parameters that can be passed to etcd rendezvous:

Parameter Description
min_workers minimum number of workers for the rendezvous to be valid
max_workers maximum number of workers to admit
timeout total timeout within which next_rendezvous is expected to succeed (default 600s)
last_call_timeout additional wait amount (“last call”) after min number of workers has been reached (defaults to 30s)
etcd_prefix path prefix (from etcd root), inside which all etcd nodes will be created (defaults to/torchelastic/p2p)

Etcd Store

The EtcdStore is the C10d Store instance type returned bynext_rendezvous() when etcd is used as the rendezvous backend.

class torch.distributed.elastic.rendezvous.etcd_store.EtcdStore(etcd_client, etcd_store_prefix, timeout=None)[source][source]

Implement a c10 Store interface by piggybacking on the rendezvous etcd instance.

This is the store object returned by EtcdRendezvous.

add(key, num)[source][source]

Atomically increment a value by an integer amount.

The integer is represented as a string using base 10. If key is not present, a default value of 0 will be assumed.

Returns

the new (incremented) value

Return type

int

check(keys)[source][source]

Check if all of the keys are immediately present (without waiting).

Return type

bool

get(key)[source][source]

Get a value by key, possibly doing a blocking wait.

If key is not immediately present, will do a blocking wait for at most timeout duration or until the key is published.

Returns

value (bytes)

Raises

LookupError - If key still not published after timeout

Return type

bytes

set(key, value)[source][source]

Write a key/value pair into EtcdStore.

Both key and value may be either Python str or bytes.

wait(keys, override_timeout=None)[source][source]

Wait until all of the keys are published, or until timeout.

Raises

LookupError - if timeout occurs

Etcd Server

The EtcdServer is a convenience class that makes it easy for you to start and stop an etcd server on a subprocess. This is useful for testing or single-node (multi-worker) deployments where manually setting up an etcd server on the side is cumbersome.

Warning

For production and multi-node deployments please consider properly deploying a highly available etcd server as this is the single point of failure for your distributed jobs.

class torch.distributed.elastic.rendezvous.etcd_server.EtcdServer(data_dir=None)[source][source]

Note

tested on etcd server v3.4.3.

Starts and stops a local standalone etcd server on a random free port. Useful for single node, multi-worker launches or testing, where a sidecar etcd server is more convenient than having to separately setup an etcd server.

This class registers a termination handler to shutdown the etcd subprocess on exit. This termination handler is NOT a substitute for calling the stop() method.

The following fallback mechanism is used to find the etcd binary:

  1. Uses env var TORCHELASTIC_ETCD_BINARY_PATH
  2. Uses <this file root>/bin/etcd if one exists
  3. Uses etcd from PATH

Usage

server = EtcdServer("/usr/bin/etcd", 2379, "/tmp/default.etcd") server.start() client = server.get_client()

use client

server.stop()

Parameters

etcd_binary_path – path of etcd server binary (see above for fallback path)