Technical Documentation (original) (raw)
Worker API
Base API and schedulers
The main worker functions are wrapped in an R6 class with the name of QSys
. This provides a standardized API to thelower-level messages that are sent via ZeroMQ.
The base class itself is derived in scheduler classes that add the required functions for submitting and cleaning up jobs:
+ QSys
|- Multicore
|- LSF
+ SGE
|- PBS
|- Torque
|- etc.
The user-visible object is a worker Pool
that wraps this, and will eventually allow to manage different workers.
Workers
Creating a worker pool
A pool of workers can be created using the [workers()](../reference/workers.html)
function, which instantiates a Pool
object of the corresponding QSys
-derived scheduler class. See[?workers](../reference/workers.html)
for details.
# start up a pool of three workers using the default scheduler
w = workers(n_jobs=3)
# if we make an unclean exit for whatever reason, clean up the jobs
on.exit(w$cleanup())
Worker startup
For workers that are started up via a scheduler, we do not know which machine they will run on. This is why we start up every worker with a TCP/IP address of the master socket that will distribute work.
This is achieved by the call to R common to all schedulers:
R --no-save --no-restore -e 'clustermq:::worker("{{ master }}")'
Worker communication
On the master’s side, we wait until a worker connects:
msg = w$recv() # this will block until a worker is ready
We can then send any expression to be evaluated on the worker using the send
method:
After the expression (in ...
), any variables that should be passed along with the call can be added. For batch processing thatclustermq
usually does, this command iswork_chunk
, where the chunk
data is added:
w$send(clustermq:::work_chunk(chunk, fun, const, rettype, common_seed),
chunk = chunk(iter, submit_index))
Worker environment
We can add any number of objects to a worker environment using theenv
method:
This will also invisibly return a data.frame
with all objects currently in the environment. If a user wants to inspect the environment without changing it they can call w$env()
without arguments. The environment will be propagated to all workers automatically in a greedy fashion.
Main event loop
Putting the above together in an event loop, we get what is essentially implemented in master
. w$send
invisibly returns an identifier to track which call was submitted, andw$current()
matches the same to w$recv()
.
w = workers(3)
on.exit(w$cleanup())
w$env(...)
while (we have new work to send || jobs pending) {
res = w$recv() # the result of the call, or NULL for a new worker
w$current()$call_ref # matches answer to request, -1 otherwise
# handle result
if (more work)
call_ref = w$send(expression, ...) # call_ref tracks request identity
else
w$send_shutdown()
}
A loop of a similar structure can be used to extendclustermq
. As an example, this was done by the targets package.
ZeroMQ message specification
Communication between the master
(main event loop) and workers (QSys
base class) is organised in_messages_. These are chunks of serialized data sent via_ZeroMQ_’s protocol (ZMTP). The parts of each message are called frames.
Master - Worker communication
The master requests an evaluation in a message with X frames (direct) or Y if proxied. This is all handled by _clustermq_internally.
- The worker identity frame or routing identifier
- A delimiter frame
- Worker status (
wlife_t
) - The call to be evaluated
- N repetitions of:
- The variable name of an environment object that is not yet present on the worker
- The variable value
If using a proxy, this will be followed by a SEXP
that contains variable names the proxy should add before forwarding to the worker.
Worker evaluation
A worker evaluates the call using the R C API:
R_tryEvalSilent(cmd, env, &err);
If an error occurs in this evaluation will be returned as a structure with class worker_error
. If a developer wants to catch errors and warnings in a more fine-grained manner, it is recommended to add their own callingHandlers
to cmd
(as_clustermq_ does work its work_chunk
).
Worker - Master communication
The result of this evaluation is then returned in a message with four (direct) or five (proxied) frames:
- Worker identity frame (handled internally by _ZeroMQ_’s
ZMQ_REQ
socket) - Empty frame (handled internally by _ZeroMQ_’s
ZMQ_REQ
socket) - Worker status (
wlife_t
) that is handled internally by_clustermq_ - The result of the call (
SEXP
), visible to the user
If using a worker via SSH, these frames will be preceded by a routing identify frame that is handled internally by ZeroMQ and added or peeled off by the proxy.