在Python中,我正在使用一个名为的库futures
,该库使我能够以简洁明了的方式对N个工作进程池进行处理:
schedulerQ = []
for ... in ...:
workParam = ... # arguments for call to processingFunction(workParam)
schedulerQ.append(workParam)
with futures.ProcessPoolExecutor(max_workers=5) as executor: # 5 CPUs
for retValue in executor.map(processingFunction, schedulerQ):
print "Received result", retValue
(这processingFunction
是受CPU限制的,因此这里没有异步机制的意义-这是关于简单的旧算术计算的)
我现在正在寻找在Scala中执行相同操作的最接近的方法。请注意,在Python中,为避免GIL问题,我使用了进程(因此使用了ProcessPoolExecutor
代替ThreadPoolExecutor
),并且库自动将workParam
要执行的每个进程实例的参数编组processingFunction(workParam)
在一起,然后将结果编组回主进程,对于执行程序的map
循环消耗。
这适用于Scala和JVM吗?原则上,我的processingFunction也可以从线程执行(根本没有全局状态)-但是我很想看到多处理和多线程的解决方案。
问题的关键部分是JVM的世界中是否存在与futures
您上面看到的Python一样清晰的API ...我认为这是我见过的最好的SMP API之一-准备一个列表使用所有调用的函数参数,然后只有两行:创建poolExecutor和map
处理函数,在工作人员产生结果后立即取回它们。第一次调用return时,结果就开始出现,processingFunction
一直持续到结果完成为止-此时for循环结束。
与在Scala中使用并行集合相比,您拥有的样板更少。
myParameters.par.map(x => f(x))
如果您想要默认的线程数(与内核数相同),将可以解决问题。
如果您坚持要设置工人数,则可以这样:
import scala.collection.parallel._
import scala.concurrent.forkjoin._
val temp = myParameters.par
temp.tasksupport = new ForkJoinTaskSupport(new ForkJoinPool(5))
temp.map(x => f(x))
返回时间的确切细节有所不同,但是您可以根据需要放入任意数量的设备f(x)
(例如,对结果进行计算并执行某些操作),因此这可以满足您的需求。
通常,仅使结果显示为完成是不够的。然后,您需要对其进行处理,也许是对它们进行分叉,收集它们,等等。如果您通常希望这样做,那么Akka Streams(从此处开始的后续链接)将接近1.0,这将有助于生成复杂的并行处理图。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句