很长时间以来,我一直在许多并行r脚本中使用sfLapply。但是,最近,随着我对并行计算的更多研究,我一直在使用sfClusterApplyLB,如果各个实例的运行时间不相同,则可以节省大量时间。因为sfLapply将在加载新批次之前等待批次的每个实例完成(这可能导致空闲实例),而完成任务的sfClusterApplyLB实例将立即分配给列表中的其余元素,从而潜在地节省了很多时间实例花费的时间不完全相同的时间。这使我提出疑问,为什么在使用降雪时我们不希望不对运行进行负载平衡?到目前为止,我唯一发现的是,当并行脚本中出现错误时,在给出错误之前,sfClusterApplyLB仍将循环遍历整个列表,而在尝试第一批操作后,sfLapply将停止。我还想念什么?负载平衡还有其他成本/缺点吗?下面是一个示例代码,显示了两者之间的区别
rm(list = ls()) #remove all past worksheet variables
working_dir="D:/temp/"
setwd(working_dir)
n_spp=16
spp_nmS=paste0("sp_",c(1:n_spp))
spp_nm=spp_nmS[1]
sp_parallel_run=function(sp_nm){
sink(file(paste0(working_dir,sp_nm,"_log.txt"), open="wt"))#######NEW
cat('\n', 'Started on ', date(), '\n')
ptm0 <- proc.time()
jnk=round(runif(1)*8000000) #this is just a redundant script that takes an arbitrary amount of time to run
jnk1=runif(jnk)
for (i in 1:length(jnk1)){
jnk1[i]=jnk[i]*runif(1)
}
ptm1=proc.time() - ptm0
jnk=as.numeric(ptm1[3])
cat('\n','It took ', jnk, "seconds to model", sp_nm)
#stop sinks
sink.reset <- function(){
for(i in seq_len(sink.number())){
sink(NULL)
}
}
sink.reset()
}
require(snowfall)
cpucores=as.integer(Sys.getenv('NUMBER_OF_PROCESSORS'))
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time((sfLapply(spp_nmS,fun=sp_parallel_run)))
sfRemoveAll()
sfStop()
sfInit( parallel=T, cpus=cpucores) #
sfExportAll()
system.time(sfClusterApplyLB(spp_nmS,fun=sp_parallel_run))
sfRemoveAll()
sfStop()
该sfLapply
函数很有用,因为它将每个可用工作程序的输入值分成一组任务,这就是mclapply
函数所谓的prescheduling。sfClusterApplyLB
与任务用时不长的情况相比,这可以提供更好的性能。
这是一个极端的例子,演示了预先安排的好处:
> system.time(sfLapply(1:100000, sqrt))
user system elapsed
0.148 0.004 0.170
> system.time(sfClusterApplyLB(1:100000, sqrt))
user system elapsed
19.317 1.852 21.222
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句