Simple Network of Workstations for R (original) (raw)
Luke Tierney
Department of Statistics and Actuarial Science
University of Iowa
Thesnow(Simple Network of Workstations) package implements a simple mechanism for using a collection of workstations or a Beowulf cluster for ``embarrassingly parallel'' computations in R. The interface, which is based in part on the Python CoW (Cluster of Workstations) package, is intended to be quite simple, and is designed so that it can be implemented on top of several different lower level communication mechanisms. Three low level interfaces have been implemented, one using PVM via therpvmpackage by Li and Rossini, one using MPI via theRmpi package by Hao Yu, and one using raw sockets that may be useful if PVM and MPI are not available. This note describes how to start up a cluster, the basic functions for computing with a cluster, and an example of using the cluster for parallel bootstrapping.
Two notes of caution:
- Interrupting computations with
^Cis likely to cause problems so avoid this when runningsnow. - Before quitting R remember to call
stopClusterfor your cluster. This helps in cleanly shutting down your framework.
Starting and Stopping a Cluster
Starting a workstation cluster is the only step in using a clusterthat depends explicitly on the underlying communication mechanism. Acluster is started by calling the makeCluster function, but the details of the call currently vary slightly depending on the type of cluster. PVM and MPI clusters may also need some preliminary preparations to start the PVM or MPI systems.
Starting a PVM Cluster
To start a PVM (Parallel Virtual Machine) cluster you first need to start PVM. I prefer to do this using the PVM console pvm:
= luke@itasca cluster% pvm pvm> add itasca add itasca 0 successful HOST DTID itasca Duplicate host pvm> add owasso add owasso 1 successful HOST DTID owasso 80000 pvm> conf conf 2 hosts, 2 data formats HOST DTID ARCH SPEED DSIG itasca 40000 LINUX 1000 0x00408841 owasso 80000 DARWIN 1000 0x0658eb59 pvm>
The parallel virtual machine configuration shown by the confcommand consists of two hosts, itasca running i386 Linux andowasso running Mac OS X.
As an alternative to the command line console pvm you can also use the graphical console xpvm, if it is installed, or you can start PVM from R using functions provided by the rpvm package.
Once PVM is running, to use a cluster of two worker nodes you would start R on the master, load the snow package, and call makeClusterwith arguments 2, the number of worker nodes, and type = "PVM":
= cl <- makeCluster(2, type = "PVM")
The returned value is a list of references to these two processes.
Currently the default cluster type is "PVM" if the rpvmpackage is available and Rmpi is not already loaded, so thetype = "PVM" argument can be omitted.
Once you are done with PVM remember to shut it down, for example with the halt command in the pvm console.
Starting an MPI Cluster
If you are using an MPI system such asLAM that supports the MPI 2 process spawning API then you can create MPI clusters along the same lines as socket or PVM clusters. A different approach is needed if your MPI system does not support process spawning. You may need tostart MPI first, for example using lamboot; the details depend on your MPI system.
Once you are done with MPI remember to shut it down if yur MPI requires this; for LAM-MPI, for example, use lamhalt.
Spawned MPI Clusters
MPI clusters for MPI systems that support process spawning can be started with
= cl <- makeCluster(2, type = "MPI")
Currently if rpvm is not available but Rmpi is available then the default cluster type is "MPI" and the type = "MPI"argument can be omitted.
MPI Clusters without Spawning
If your MPI system does not support process spawning, or if you prefer to run MPI jobs with mpirun, you will need to install Rmpiaccordingly, and you need to install the RMPISNOW shell script from the installed snow package or inst directory of the package sources in an appropriate location, preferably on your path. If RMPISNOW is in your path you can then start R with
= mpirun -np 3 RMPISNOW
This starts a master and two worker processes. A cluster is created inthe master process. You can obtain a reference to the cluster with
With recent versins of snow You can also obtain a reference to the running cluster with
+= [<-D->] cl <- makeCluster()
or, for a cluster with two workers, with
Starting a Socket Cluster
To start a socket cluster you use makeCluster and specifytype="SOCK" and a list of machines to use. For example,
= cl <- makeCluster(c("itasca", "owasso"), type = "SOCK")
creates a cluster consisting of two R processes, one running on a machine named itasca and the other on owasso. The returned value is a list of references to these two processes.
Additional Cluster Creation Arguments
The cluster creation function makeCluster accepts a number of additional named arguments for specifying options for the node creation processes. The most useful arguments are type for specifying the cluster type, homogeneous for specifying whether the startup process should assume that worker hosts have the same architecture and file system layout as the master (see Section[->] for more on heterogeneous clusters), and outfile for specifying where output from worker processes is to be sent. By default outfile is /dev/null. For debugging purposes, especially during initial configuration, it can be useful to redirect output to a proper file.
An alternative to specifying additional cluster options with eachcluster creation call is to use setDefaultClusterOptions to change the defaults. For example, to specify that the default should be an inhomogeneous MPI cluster use
setDefaultClusterOptions(type="MPI", homogeneous = FALSE)
Stopping a Cluster
To stop a cluster you should use
= stopCluster(cl)
Socket clusters should stop automatically when the process that created them terminates; however, it is still a good idea to callstopCluster. If you do not call stopCluster to terminate a PVM cluster before the creating process exits you can use the haltcommand in the PVM console. Your MPI system may provide a similar facility; for LAM you can use lamhalt or lamwipe.
Inhomogeneous Systems
[*]The default option settings for cluster creation are intended for homogeneous systems where the R shell script and the snowlibrary live at the same absolute paths on all nodes. snow need not be installed in the system library, but, for the defaults to work, it has to be in one of the libraries specified in the R_LIBSenvironment variable for the master.
For inhomogeneous systems, such as systems with differing file system layouts or systems where different architectures share a common file system, some additional configuration is needed:
- On each machine used to run worker processes copy the script
RunSnowNodefrom the installedsnowto a location on the search path for that machine and make sure it is executable. - On the master machine and on each machine used to run worker processes set the environment variable
R_SNOW_LIBto the name of the directory that contains thesnowpackage and supporting packages such asrpvm,Rmpi, orrlecuyerfor the appropriate architecture. - Make sure the appropriate version of
Ris available on the search path on each machine. Currently setting theR_SNOW_LIBvariable to a nonempty value on the master causes the default setting of thehomogeneouscluster option to beFALSE; otherwise, it isTRUE.
Using the Cluster
Some Basic Functions
The basic functions for using a cluster are clusterCall andclusterApply. clusterCall calls a specified function with identical arguments on each member of the cluster and returns a list of the results. The calls are executed in parallel. For example, we can ask the nodes for their names and machine types:
clusterCall(cl, function() Sys.info()[c("nodename","machine")]) [[1]] nodename machine "owasso.stat.uiowa.edu" "Power Macintosh"
[[2]] nodename machine "itasca" "i686"
A useful variation of clusterCall is clusterEvalQ, defined as
= clusterEvalQ<-function(cl, expr) clusterCall(cl, eval, substitute(expr), env=.GlobalEnv)
This can be used, for example, to load a package on each cluster node with
= clusterEvalQ(cl, library(boot))
clusterApply is a version of lapply that evaluates each call on a different member of the cluster. It requires that the number of nodes in the cluster be at least as large as the number of elements in the list argument. A simple example:
clusterApply(cl, 1:2, get("+"), 3) [[1]] [1] 4
[[2]] [1] 5
Random Number Generation
The default random number generators are likely to be quite correlated:
clusterCall(cl, runif, 3) [[1]] [1] 0.08496597 0.35232298 0.60300751
[[2]] [1] 0.08496597 0.35232298 0.60300751
A quick and (very) dirty way of addressing this is random seeding,using something like
= clusterApply(cl, runif(length(cl),max=10000000), set.seed)
A better approach is to use a parallel random number genaratorpackage. Several are available. By default snow uses therlecuyer package, which provides an interface to a parallel random number generator package ofL'Ecuyer, Simard, Chen, and Kelton. The functionclusterSetupRNG handles the initialization. The default call with no additional arguments uses a random seed; named arguments described in the help page can be used for more control:
clusterSetupRNG(cl) clusterCall(cl, runif, 3) [[1]] [1] 0.749391854 0.007316102 0.152742874
[[2]] [1] 0.8424790 0.8896625 0.2256776
A Bootstrap Example
The boot package includes an example using the data nuclear. The setup code, given in the boot help page, is
= library(boot)
In this example we show the use of boot in a prediction from
regression based on the nuclear data. This example is taken
from Example 6.8 of Davison and Hinkley (1997). Notice also
that two extra arguments to statistic are passed through boot.
data(nuclear) nuke <- nuclear[,c(1,2,5,7,8,10,11)] nuke.lm <- glm(log(cost)~date+log(cap)+ne+ ct+log(cum.n)+pt, data=nuke) nuke.diag <- glm.diag(nuke.lm) nuke.res <- nuke.diag$res*nuke.diag$sd nuke.res <- nuke.res-mean(nuke.res)
We set up a new dataframe with the data, the standardized
residuals and the fitted values for use in the bootstrap.
nuke.data <- data.frame(nuke,resid=nuke.res,fit=fitted(nuke.lm))
Now we want a prediction of plant number 32 but at date 73.00
new.data <- data.frame(cost=1, date=73.00, cap=886, ne=0, ct=0, cum.n=11, pt=1) new.fit <- predict(nuke.lm, new.data)
nuke.fun <- function(dat, inds, i.pred, fit.pred, x.pred) { assign(".inds", inds, envir=.GlobalEnv) lm.b <- glm(fit+resid[.inds] ~date+log(cap)+ne+ct+ log(cum.n)+pt, data=dat) pred.b <- predict(lm.b,x.pred) remove(".inds", envir=.GlobalEnv) c(coef(lm.b), pred.b-(fit.pred+dat$resid[i.pred])) }
On a single workstation the actual bootstrap example takesapproximately half a minute:
system.time(nuke.boot <-
boot(nuke.data, nuke.fun, R=999, m=1,fit.pred=new.fit, x.pred=new.data))
[1] 26.32 0.71 27.02 0.00 0.00
Using a two-machine cluster (identical i686 nodes this time) isapproximately twice as fast:
clusterEvalQ(cl, library(boot)) system.time(cl.nuke.boot <-
clusterCall(cl,boot,nuke.data, nuke.fun, R=500, m=1,fit.pred=new.fit, x.pred=new.data))
[1] 0.01 0.00 14.27 0.00 0.00
Using a five node cluster takes about 6 seconds:
cl<-makeCluster(5) clusterEvalQ(cl,library(boot)) system.time(cl.nuke.boot <-
clusterCall(cl,boot,nuke.data, nuke.fun, R=200, m=1,fit.pred=new.fit, x.pred=new.data))
[1] 0.03 0.00 5.58 0.00 0.00
The value to use comparing speeds is the third one, the elapsed time.
Some Higher Level Functions
Higher level functions defined in terms of clusterApply includeparLapply, parSapply, and parApply. These are parallel versions of lapply, sapply, and apply. Some examples:
= [D->] sum(parApply(cl, matrix(1:100,10), 1, sum)) sum(parApply(cl, matrix(1:10000,100), 1, sum))
+= [<-D->] x<-1:100/101 system.time(qtukey(x, 2, df=2)) system.time(unlist(parLapply(cl, x, qtukey, 2, df=2)))
A very simple version of a parallel matrix multiply is also provides as anillustration.
+= [<-D] A<-matrix(rnorm(10000),100) system.time(A %*% A) system.time(parMM(cl,A , A))
Comments
Some points to keep in mind:
- Communication is much slower than computation.
- Communication is serialized in this simple design.
- Some R functions produce large results--reduce on the compute node before returning.
- The PVM and MPI versions are more robust in terms of making sure no stray jobs are left:
- Using the PVM console or
lamhaltandlamwipeyou can kill all R processes by halting the PVM or LAM-MPI universe. - The socket implementation should not leave any nodes running when the master exits, but it could in some odd circumstances.
- Occasionally things can go wrong and manual cleanup is needed.
- Using the PVM console or
Open Issues
Error Handling
Error handling is currently very primitive. All evaluations on worker nodes are carried out using try. Results returned can therefore either be legitimate results or try errors. In principle user code should check for this.
The right way to handle situations where some nodes signal errors and some do not is not obvious and can depend on the application. Once R obtains a more sophisticated error handling mechanism it will be possible to build on that mechanism to provide flexible user control over the handling of errors in snow.
Interrupt Handling
Ideally, an interrupt on the master should be propagated to the worker nodes and interrupt any calculations currently in progress. This does not happen. In fact, an interrupt on the master during a snowcomputation will most likely leave the communication infrastructure in a confused state that makes further use of the cluster within the current R session impossible. The temporary solution: don't interrupt a snow computation. This is clearly not a satisfactory solution. Once R obtains a mechanism for controlling the handling of interrupts it sill be possible to do somewhat better. Whether it sill be possible to actually propagate interrupts to worker nodes is not completely clear and may depend on operating system considerations.
*