Apply Functions in Parallel (original) (raw)
seqParallel: Apply Functions in Parallel
Description Usage Arguments Details Value Author(s) See Also Examples
Applies a user-defined function in parallel.
seqParallel(cl=seqGetParallel(), gdsfile, FUN, split=c("by.variant", "by.sample", "none"), .combine="unlist", .selection.flag=FALSE, .initialize=NULL, .finalize=NULL, .initparam=NULL, .balancing=FALSE, .bl_size=10000L, .bl_progress=FALSE, ...) seqParApply(cl=seqGetParallel(), x, FUN, load.balancing=TRUE, ...) |
---|
cl | NULL or FALSE: serial processing; TRUE: multicore processing (the maximum number of cores minor one); a numeric value: the number of cores to be used; a clusterobject for parallel processing, created by the functions in the packageparallel, like makeCluster; a BiocParallelParam object from the BiocParallel package. See details |
---|---|
gdsfile | a SeqVarGDSClass object, or NULL |
FUN | the function to be applied, should be likeFUN(gdsfile, ...) or FUN(...) |
split | split the dataset by variant or sample according to multiple processes, or "none" for no split; split="by.variant" by default |
.combine | define a fucntion for combining results from different processes; by default, "unlist" is used, to produce a vector which contains all the atomic components, viaunlist(..., recursive=FALSE); "list", return a list of results created by child processes; "none", no return; or a function with one or two arguments, like "+" |
.selection.flag | TRUE – passes a logical vector of selection to the second argument of FUN(gdsfile, selection, ...) |
.initialize | a user-defined function for initializing workers, should have two arguments (process_id, param) |
.finalize | a user-defined function for finalizing workers, should have two arguments (process_id, param) |
.initparam | parameters passed to .initialize and.initialize |
.balancing | load balancing if TRUE |
.bl_size | chuck size, the increment for load balancing, 10000 for variants |
.bl_progress | if TRUE and .balancing=TRUE, show progress information |
x | a vector (atomic or list), passed to FUN |
load.balancing | if TRUE, call clusterApplyLBinstead of clusterApply |
... | optional arguments to FUN |
When cl
is TRUE
or a numeric value, forking techniques are used to create a new child process as a copy of the current R process, see?parallel::mcfork
. However, forking is not available on Windows, andmakeCluster
is called to make a cluster which will be deallocated after calling FUN
.
It is strongly suggested to use seqParallel
together withseqParallelSetup
. seqParallelSetup
could work around the problem of forking on Windows, without allocating clusters frequently.
The user-defined function could use two predefined variablesSeqArray:::process_count
and SeqArray:::process_index
to tell the total number of cluster nodes and which cluster node being used.
seqParallel(, gdsfile=NULL, FUN=..., split="none")
could be used to setup multiple streams of pseudo-random numbers, and seenextRNGStream
or nextRNGSubStream
in the packageparallel
.
A vector or list of values.
Xiuwen Zheng
[seqSetFilter](/bioc/SeqArray/man/seqSetFilter.html)
, [seqGetData](/bioc/SeqArray/man/seqGetData.html)
,[seqApply](/bioc/SeqArray/man/seqApply.html)
, [seqParallelSetup](/bioc/SeqArray/man/seqParallelSetup.html)
,[seqGetParallel](/bioc/SeqArray/man/seqGetParallel.html)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | library(parallel) # choose an appropriate cluster size or number of cores seqParallelSetup(2) # the GDS file (gds.fn <- seqExampleFileName("gds")) # display (gdsfile <- seqOpen(gds.fn)) # the uniprocessor version afreq1 <- seqParallel(, gdsfile, FUN = function(f) { seqApply(f, "genotype", as.is="double", FUN=function(x) mean(x==0, na.rm=TRUE)) }, split="by.variant") length(afreq1) summary(afreq1) # run in parallel afreq2 <- seqParallel(, gdsfile, FUN = function(f) { seqApply(f, "genotype", as.is="double", FUN=function(x) mean(x==0, na.rm=TRUE)) }, split="by.variant") length(afreq2) summary(afreq2) # check length(afreq1) # 1348 all(afreq1 == afreq2) ################################################################ # check -- variant splits seqParallel(, gdsfile, FUN = function(f) { v <- seqGetFilter(f) sum(v$variant.sel) }, split="by.variant") # [1] 674 674 ################################################################ seqParallel(, NULL, FUN = function() { paste(SeqArray:::process_index, SeqArray:::process_count, sep=" / ") }, split="none") seqParallel(, NULL, FUN = function() { SeqArray:::process_index }, split="none", .combine=function(i) print(i)) seqParallel(, NULL, FUN = function() { SeqArray:::process_index }, split="none", .combine="+") ################################################################ # close the GDS file seqClose(gdsfile) # clear the parallel cluster seqParallelSetup(FALSE) |
---|
SeqArray documentation built on Nov. 8, 2020, 5:08 p.m.