Python Multiprocessing-进程数

ds_user

我正在执行以下代码,并且可以正常工作,但它不会产生于不同的进程,相反,有时所有进程都在同一进程中运行,有时在一个进程中运行2。我正在使用4 cpu机。此代码有什么问题?

def f(values):
    print(multiprocessing.current_process())
    return values

def main():
    p = Pool(4) #number of processes = number of CPUs
    keys, values= zip(*data.items()) #ordered keys and values
    processed_values= p.map( f, values )
    result= dict( zip(keys, processed_values ) ) 
    p.close() # no more tasks
    p.join()  # wrap up current tasks

结果是

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

有时候像这样

<SpawnProcess(SpawnPoolWorker-3, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-3, started daemon)>

有时,

<SpawnProcess(SpawnPoolWorker-1, started daemon)>
<SpawnProcess(SpawnPoolWorker-4, started daemon)>
<SpawnProcess(SpawnPoolWorker-2, started daemon)>
<SpawnProcess(SpawnPoolWorker-1, started daemon)>

我的问题是,它将职能分配给工人的依据是什么?我编写代码的方式是根据字典中键的数量来决定进程的数量(考虑到我的数据将始终比我的CPU少一些键)。我的代码将开始-主代码使用单个进程读取文件并制作出字典,然后将其分支到并发进程数,并等待它们处理数据(我正在使用pool.map进行处理),然后一旦获得子进程的结果,它将开始处理它们。我如何才能实现此父级等待子级处理步骤?

损害

您的代码没有错。您的工作项非常快-如此之快,以至于同一工作进程可以运行该函数,返回结果,然后赢得竞争,以消耗内部队列中multiprocessing.Pool用于分配工作的下一个任务当您打电话时map,工作项被分为几批并放入一个Queue这是该实现的一部分,将pool.map可迭代的项目分块并将其放入队列中:

    task_batches = Pool._get_tasks(func, iterable, chunksize)
    result = MapResult(self._cache, chunksize, len(iterable), callback)
    self._taskqueue.put((((result._job, i, mapstar, (x,), {}) 
                          for i, x in enumerate(task_batches)), None))

每个工作进程运行一个函数,该函数具有一个无限的while循环,该循环消耗该队列中的项目*:

while maxtasks is None or (maxtasks and completed < maxtasks):
    try:
        task = get()  # Pulls an item off the taskqueue
    except (EOFError, IOError):
        debug('worker got EOFError or IOError -- exiting')
        break

    if task is None:
        debug('worker got sentinel -- exiting')
        break

    job, i, func, args, kwds = task
    try:
        result = (True, func(*args, **kwds))  # Runs the function you passed to map
    except Exception, e:
        result = (False, e)
    try:
        put((job, i, result))  # Sends the result back to the parent
    except Exception as e:
        wrapped = MaybeEncodingError(e, result[1])
        debug("Possible encoding error while sending result: %s" % (
            wrapped))

可能是同一工作人员偶然能够消费一个项目,运行func,然后消费下一个项目。有些奇怪-我不能重现它在我的机器上运行相同的代码,你的榜样-但具有相同的工人抢两个从队列中四个项目是非常正常的。

如果您通过添加对以下项的调用来使工作器函数花费更长的时间,则应该始终看到均匀的分布time.sleep

def f(values):
    print(multiprocessing.current_process())
    time.sleep(1)
    return values

*实际上并非完全如此-在主进程中运行有一个线程,该线程从中消费taskqueue,然后将拉出的线程粘到另一个线程中Queue就是子进程从中消费的线程)

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

使用Manager的python multiprocessing进程表现异常

来自分类Dev

python multiprocessing动态创建的进程和管道

来自分类Dev

列表长于进程数时的Python multiprocessing.Pool.map行为

来自分类Dev

python multiprocessing-子进程阻止父进程

来自分类Dev

python multiprocessing:为什么终止后进程消失了?

来自分类Dev

python multiprocessing-进程在大队列的联接上挂起

来自分类Dev

为进程指定特定的CPU-python multiprocessing

来自分类Dev

如果创建multiprocessing.Pool,则Python子进程wait()失败

来自分类Dev

从父进程访问Python Multiprocessing.Process子类的状态

来自分类Dev

奇怪的进程克隆与python multiprocessing一起出现

来自分类Dev

为进程指定特定的CPU-python multiprocessing

来自分类Dev

python multiprocessing子进程触发父事件或方法

来自分类Dev

python multiprocessing,为每个进程创建实例并重用它

来自分类Dev

Python Multiprocessing Process.start() 等待进程启动

来自分类Dev

python multiprocessing-在正在运行的进程上进行类似选择,以查看已完成的进程

来自分类Dev

如何使进程休眠而不使其他进程在python multiprocessing中休眠

来自分类Dev

使用python multiprocessing子进程如何终止另一个子进程?

来自分类Dev

Python Multiprocessing:当父进程终止时终止守护进程的更好实现是什么?

来自分类Dev

监视python中子进程产生的进程数

来自分类Dev

python multiprocessing map对最后一个进程的错误处理

来自分类Dev

为什么我的Python Multiprocessing worker进程不使用多个内核?

来自分类Dev

python,multiprocessing和dmtcp:在Pool中检查一个进程?

来自分类Dev

为什么我的Python Multiprocessing worker进程不使用多个内核?

来自分类Dev

python multiprocessing.pool.map,将参数传递给生成的进程

来自分类Dev

python 3.4 multiprocessing - 为不同的进程更改 ld_library_path

来自分类Dev

Python Multiprocessing Early Termination

来自分类Dev

Multiprocessing Queues with python

来自分类Dev

Python multiprocessing stdin input

来自分类Dev

Memory Error with Multiprocessing in Python

Related 相关文章

  1. 1

    使用Manager的python multiprocessing进程表现异常

  2. 2

    python multiprocessing动态创建的进程和管道

  3. 3

    列表长于进程数时的Python multiprocessing.Pool.map行为

  4. 4

    python multiprocessing-子进程阻止父进程

  5. 5

    python multiprocessing:为什么终止后进程消失了?

  6. 6

    python multiprocessing-进程在大队列的联接上挂起

  7. 7

    为进程指定特定的CPU-python multiprocessing

  8. 8

    如果创建multiprocessing.Pool,则Python子进程wait()失败

  9. 9

    从父进程访问Python Multiprocessing.Process子类的状态

  10. 10

    奇怪的进程克隆与python multiprocessing一起出现

  11. 11

    为进程指定特定的CPU-python multiprocessing

  12. 12

    python multiprocessing子进程触发父事件或方法

  13. 13

    python multiprocessing,为每个进程创建实例并重用它

  14. 14

    Python Multiprocessing Process.start() 等待进程启动

  15. 15

    python multiprocessing-在正在运行的进程上进行类似选择,以查看已完成的进程

  16. 16

    如何使进程休眠而不使其他进程在python multiprocessing中休眠

  17. 17

    使用python multiprocessing子进程如何终止另一个子进程?

  18. 18

    Python Multiprocessing:当父进程终止时终止守护进程的更好实现是什么?

  19. 19

    监视python中子进程产生的进程数

  20. 20

    python multiprocessing map对最后一个进程的错误处理

  21. 21

    为什么我的Python Multiprocessing worker进程不使用多个内核?

  22. 22

    python,multiprocessing和dmtcp:在Pool中检查一个进程?

  23. 23

    为什么我的Python Multiprocessing worker进程不使用多个内核?

  24. 24

    python multiprocessing.pool.map,将参数传递给生成的进程

  25. 25

    python 3.4 multiprocessing - 为不同的进程更改 ld_library_path

  26. 26

    Python Multiprocessing Early Termination

  27. 27

    Multiprocessing Queues with python

  28. 28

    Python multiprocessing stdin input

  29. 29

    Memory Error with Multiprocessing in Python

热门标签

归档