我正在执行以下代码,并且可以正常工作,但它不会产生于不同的进程,相反,有时所有进程都在同一进程中运行,有时在一个进程中运行2。我正在使用4 cpu机。此代码有什么问题?
def f(values):
print(multiprocessing.current_process())
return values
def main():
p = Pool(4) #number of processes = number of CPUs
keys, values= zip(*data.items()) #ordered keys and values
processed_values= p.map( f, values )
result= dict( zip(keys, processed_values ) )
p.close() # no more tasks
p.join() # wrap up current tasks
结果是
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
有时候像这样
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>
有时,
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
我的问题是,它将职能分配给工人的依据是什么?我编写代码的方式是根据字典中键的数量来决定进程的数量(考虑到我的数据将始终比我的CPU少一些键)。我的代码将开始-主代码使用单个进程读取文件并制作出字典,然后将其分支到并发进程数,并等待它们处理数据(我正在使用pool.map进行处理),然后一旦获得子进程的结果,它将开始处理它们。我如何才能实现此父级等待子级处理步骤?
您的代码没有错。您的工作项非常快-如此之快,以至于同一工作进程可以运行该函数,返回结果,然后赢得竞争,以消耗内部队列中multiprocessing.Pool
用于分配工作的下一个任务。当您打电话时map
,工作项被分为几批并放入一个Queue
。这是该实现的一部分,将pool.map
可迭代的项目分块并将其放入队列中:
task_batches = Pool._get_tasks(func, iterable, chunksize)
result = MapResult(self._cache, chunksize, len(iterable), callback)
self._taskqueue.put((((result._job, i, mapstar, (x,), {})
for i, x in enumerate(task_batches)), None))
每个工作进程运行一个函数,该函数具有一个无限的while循环,该循环消耗该队列中的项目*:
while maxtasks is None or (maxtasks and completed < maxtasks):
try:
task = get() # Pulls an item off the taskqueue
except (EOFError, IOError):
debug('worker got EOFError or IOError -- exiting')
break
if task is None:
debug('worker got sentinel -- exiting')
break
job, i, func, args, kwds = task
try:
result = (True, func(*args, **kwds)) # Runs the function you passed to map
except Exception, e:
result = (False, e)
try:
put((job, i, result)) # Sends the result back to the parent
except Exception as e:
wrapped = MaybeEncodingError(e, result[1])
debug("Possible encoding error while sending result: %s" % (
wrapped))
可能是同一工作人员偶然能够消费一个项目,运行func
,然后消费下一个项目。这是有些奇怪-我不能重现它在我的机器上运行相同的代码,你的榜样-但具有相同的工人抢两个从队列中四个项目是非常正常的。
如果您通过添加对以下项的调用来使工作器函数花费更长的时间,则应该始终看到均匀的分布time.sleep
:
def f(values):
print(multiprocessing.current_process())
time.sleep(1)
return values
*实际上并非完全如此-在主进程中运行有一个线程,该线程从中消费taskqueue
,然后将拉出的线程粘到另一个线程中Queue
,这就是子进程从中消费的线程)
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句