Apply Functions in Parallel (original) (raw)

seqParallel: Apply Functions in Parallel

Description Usage Arguments Details Value Author(s) See Also Examples

View source: R/Utilities.R

Applies a user-defined function in parallel.

seqParallel(cl=seqGetParallel(), gdsfile, FUN, split=c("by.variant", "by.sample", "none"), .combine="unlist", .selection.flag=FALSE, .initialize=NULL, .finalize=NULL, .initparam=NULL, .balancing=FALSE, .bl_size=10000L, .bl_progress=FALSE, ...) seqParApply(cl=seqGetParallel(), x, FUN, load.balancing=TRUE, ...)
cl NULL or FALSE: serial processing; TRUE: multicore processing (the maximum number of cores minor one); a numeric value: the number of cores to be used; a clusterobject for parallel processing, created by the functions in the packageparallel, like makeCluster; a BiocParallelParam object from the BiocParallel package. See details
gdsfile a SeqVarGDSClass object, or NULL
FUN the function to be applied, should be likeFUN(gdsfile, ...) or FUN(...)
split split the dataset by variant or sample according to multiple processes, or "none" for no split; split="by.variant" by default
.combine define a fucntion for combining results from different processes; by default, "unlist" is used, to produce a vector which contains all the atomic components, viaunlist(..., recursive=FALSE); "list", return a list of results created by child processes; "none", no return; or a function with one or two arguments, like "+"
.selection.flag TRUE – passes a logical vector of selection to the second argument of FUN(gdsfile, selection, ...)
.initialize a user-defined function for initializing workers, should have two arguments (process_id, param)
.finalize a user-defined function for finalizing workers, should have two arguments (process_id, param)
.initparam parameters passed to .initialize and.initialize
.balancing load balancing if TRUE
.bl_size chuck size, the increment for load balancing, 10000 for variants
.bl_progress if TRUE and .balancing=TRUE, show progress information
x a vector (atomic or list), passed to FUN
load.balancing if TRUE, call clusterApplyLBinstead of clusterApply
... optional arguments to FUN

When cl is TRUE or a numeric value, forking techniques are used to create a new child process as a copy of the current R process, see?parallel::mcfork. However, forking is not available on Windows, andmakeCluster is called to make a cluster which will be deallocated after calling FUN.

It is strongly suggested to use seqParallel together withseqParallelSetup. seqParallelSetup could work around the problem of forking on Windows, without allocating clusters frequently.

The user-defined function could use two predefined variablesSeqArray:::process_count and SeqArray:::process_index to tell the total number of cluster nodes and which cluster node being used.

seqParallel(, gdsfile=NULL, FUN=..., split="none") could be used to setup multiple streams of pseudo-random numbers, and seenextRNGStream or nextRNGSubStream in the packageparallel.

A vector or list of values.

Xiuwen Zheng

[seqSetFilter](/bioc/SeqArray/man/seqSetFilter.html), [seqGetData](/bioc/SeqArray/man/seqGetData.html),[seqApply](/bioc/SeqArray/man/seqApply.html), [seqParallelSetup](/bioc/SeqArray/man/seqParallelSetup.html),[seqGetParallel](/bioc/SeqArray/man/seqGetParallel.html)

1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 library(parallel) # choose an appropriate cluster size or number of cores seqParallelSetup(2) # the GDS file (gds.fn <- seqExampleFileName("gds")) # display (gdsfile <- seqOpen(gds.fn)) # the uniprocessor version afreq1 <- seqParallel(, gdsfile, FUN = function(f) { seqApply(f, "genotype", as.is="double", FUN=function(x) mean(x==0, na.rm=TRUE)) }, split="by.variant") length(afreq1) summary(afreq1) # run in parallel afreq2 <- seqParallel(, gdsfile, FUN = function(f) { seqApply(f, "genotype", as.is="double", FUN=function(x) mean(x==0, na.rm=TRUE)) }, split="by.variant") length(afreq2) summary(afreq2) # check length(afreq1) # 1348 all(afreq1 == afreq2) ################################################################ # check -- variant splits seqParallel(, gdsfile, FUN = function(f) { v <- seqGetFilter(f) sum(v$variant.sel) }, split="by.variant") # [1] 674 674 ################################################################ seqParallel(, NULL, FUN = function() { paste(SeqArray:::process_index, SeqArray:::process_count, sep=" / ") }, split="none") seqParallel(, NULL, FUN = function() { SeqArray:::process_index }, split="none", .combine=function(i) print(i)) seqParallel(, NULL, FUN = function() { SeqArray:::process_index }, split="none", .combine="+") ################################################################ # close the GDS file seqClose(gdsfile) # clear the parallel cluster seqParallelSetup(FALSE)

SeqArray documentation built on Nov. 8, 2020, 5:08 p.m.