R的多进程使用与改进
在R中需要使用多进程时,常见方案是使用foreach和doParallel的组合。
foreach
foreach包中最重要的是foreach函数,该函数创建一个foreach对象,随后串行或并行的执行表达式。
library(foreach)?foreachout:
foreach( ..., .combine, .init, .final = NULL, .inorder = TRUE, .multicombine = FALSE, .maxcombine = if (.multicombine) 100 else 2, .errorhandling = c("stop", "remove", "pass"), .packages = NULL, .export = NULL, .noexport = NULL, .verbose = FALSE)e1 %:% e2when(cond)obj %do% exobj %dopar% extimes(n)foreach函数在创建时常用的几个参数为:
...: 表达式中使用的变量。
.packages: 表达式依赖的包,字符向量。
.export: 表达式依赖的变量,字符向量。
.combine: 运算后结果的组合方式,默认为列表,可选 'c' 'cbind' 'rbind' '+' '*'等。
.errorhandling: 当运行中出现错误时的处理方式。
使用时,对象后接%do%为串行,%dopar%为并行。
foreach(i=1:3) %do% sqrt(i)out:
[[1]][1] 1[[2]][1] 1.414214[[3]][1] 1.732051当使用%do%执行时,程序将会自动忽略.packages与.export变量。
如果需要使用多进程,不只需要更换成%dopar%,你还需要注册集群,执行,结束集群。
library(doParallel)cl = makeCluster(4) #注册4个进程的集群registerDoParallel(cl)foreach(i=1:3) %dopar% sqrt(i)stopCluster(cl) # 记得结束集群包装
对多进程进行包装,形成runParallel函数。
library(foreach)library(doParallel)runParallel = function(FUN,PARAMS,packages = NULL,export = NULL){ cl = makeCluster(4) registerDoParallel(cl) N = length(PARAMS) R = foreach( i = 1:N, .packages = packages, .export = export, .errorhandling = 'stop' ) %dopar% { r = do.call(FUN, PARAMS[[i]]) r } stopCluster(cl) R}程序中的do.call能够使用提供的参数运行FUN函数。
runParallel函数传入FUN与并行参数的列表集合PARAMS,就可以使用FUN对每个值进行处理,然后返回全部值。
问题
在实际使用中遇到这样一个问题,在这里把问题稍微简化一下。
有两个文件,do_some_thing.R和do_some_other_thing.R,里面各自编写了一个函数。
do_some_thing.R
do_some_thing = function(x){ do_some_other_thing(x**2)}do_some_other_thing.R
do_some_other_thing = function(x){ x / 2}很明显,do_some_thing.R中引用了do_some_other_thing.R中的函数。
现在我source这两个文件并暴露这两个函数,编写一个函数调用do_some_thing。
some_thing = new.env()source('do_some_thing.R',local = some_thing)some_other_thing = new.env()source('do_some_other_thing.R',local = some_other_thing)attach(some_thing)attach(some_other_thing)fun = function(x){do_some_thing(x+1)}然后进行多进程调用。
params = lapply(1:10, list)runParallel(fun,params)得到错误。
Error in { : task 1 failed - "没有"do_some_thing"这个函数"找不到do_some_thing函数,然而当我们加上所有可能的.export变量后。
runParallel(fun,params,export=c('do_some_thing','do_some_other_thing','some_thing','some_other_thing'))仍然失败。
Error in { : task 1 failed - "没有"do_some_other_thing"这个函数" 有趣的是找不到的函数变成了do_some_other_thing,然而我明明export这个变量了啊。
在搜索中,回答普遍认为doParallel存在设计缺陷,它不能顺利的找到函数内调用的其他自定义函数。
在不停的搜索过后,我终于找到了一个解决方案,使用doFuture包[1]。
doFuture包会自动解决这类依赖问题,无需提供.packages和.export参数,同时无需手动结束集群。
一个更改后的版本是这样的。
runParallel = function(FUN,PARAMS){ registerDoFuture() plan(multisession, workers = 4) #在这里指定进程数 N = length(PARAMS) R = foreach( i = 1:N, .errorhandling = 'stop' ) %dopar% { r = do.call(FUN, PARAMS[[i]]) r } R}runParallel(fun,params)out:
[[1]][1] 2[[2]][1] 4.5······成功。
十分推荐doFuture包!
我
我是 SSSimon Yang,关注我,用code解读世界

References
[1] doFuture包: https://github.com/HenrikBengtsson/doFuture