Python multiprocessing.Pool.map悄然死亡

jeffery_the_wind

我试图并行放置一个for循环来加快某些代码的速度。考虑一下:

from multiprocessing import Pool

results = []

def do_stuff(str):
    print str
    results.append(str)

p = Pool(4)
p.map(do_stuff, ['str1','str2','str3',...]) # many strings here ~ 2000
p.close()

print results

我显示了一些调试消息,do_stuff以跟踪程序在死亡之前能走多远。似乎每次都在不同的时间死去。例如,它将打印“ str297”,然后它将停止运行,我将看到所有CPU停止工作并且该程序就在那儿。应该发生一些错误,但是没有错误消息显示。有谁知道如何调试此问题?

更新

我尝试重新编写代码。map没有使用此功能,而是尝试了以下apply_async功能:

        pool = Pool(5)
        results = pool.map(do_sym, underlyings[0::10])
        results = []
        for sym in underlyings[0::10]:
            r = pool.apply_async(do_sym, [sym])
            results.append(r)

        pool.close()
        pool.join()

        for result in results:
            print result.get(timeout=1000)

map功能与该功能一样好,但是最终以相同的方式挂起。它永远不会到达for循环,在这里打印结果。

在进行了更多工作之后,尝试了像unutbu的答案中所建议的那样进行一些调试日志记录,我将在此处提供更多信息。这个问题很奇怪。好像游泳池只是挂在那里,无法关闭并继续执行该程序。我使用PyDev环境测试程序,但我想我会尝试仅在控制台中运行python。在控制台中,我得到了相同的行为,但是当我按Control + C杀死该程序时,我得到了一些输出,这些输出可以解释问题出在哪里:

> KeyboardInterrupt ^CProcess PoolWorker-47: Traceback (most recent call
> last):   File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-48: Traceback (most recent call
> last):   File "/usr/lib/python2.7/multiprocessing/process.py", line
> 258, in _bootstrap Process PoolWorker-45: Process PoolWorker-46:
> Process PoolWorker-44:
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> Traceback (most recent call last): Traceback (most recent call last):
> Traceback (most recent call last):   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap   File "/usr/lib/python2.7/multiprocessing/process.py", line 258, in _bootstrap   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 258, in
> _bootstrap
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     racquire()
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
> KeyboardInterrupt
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     self.run()
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self.run()   File "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run   File
> "/usr/lib/python2.7/multiprocessing/process.py", line 114, in run
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
>     self._target(*self._args, **self._kwargs)
>     self._target(*self._args, **self._kwargs)   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker
>     racquire()   File "/usr/lib/python2.7/multiprocessing/pool.py", line 102, in worker KeyboardInterrupt
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     task = get()
>     task = get()   File "/usr/lib/python2.7/multiprocessing/queues.py", line 376, in get  
> File "/usr/lib/python2.7/multiprocessing/queues.py", line 374, in get
>     racquire()
>     return recv()
>     racquire() KeyboardInterrupt KeyboardInterrupt KeyboardInterrupt

然后,实际上该程序永不消亡。我最终不得不关闭终端窗口以将其杀死。

更新2

我将问题缩小到池中正在运行的函数中,这是导致问题的MySQL数据库事务。我以前用过这个MySQLdb包裹。我将其切换pandas.read_sql为交易的一项功能,并且现在可以正常使用。

忘了它

pool.map以列表形式返回结果。因此,不要调用results.append并发进程(因为每个进程将具有自己的独立副本,因此将不起作用results),而是将其分配resultspool.map主进程中返回的值

import multiprocessing as mp

def do_stuff(text):
    return text

if __name__ == '__main__':
    p = mp.Pool(4)
    tasks = ['str{}'.format(i) for i in range(2000)]
    results = p.map(do_stuff, tasks)
    p.close()

    print(results)

产量

['str0', 'str1', 'str2', 'str3', ...]

使用多处理调试脚本的一种方法是添加日志记录语句。为此multiprocessing模块提供了一个辅助功能mp.log_to_stderr例如,

import multiprocessing as mp
import logging

logger = mp.log_to_stderr(logging.DEBUG)

def do_stuff(text):
    logger.info('Received {}'.format(text))
    return text

if __name__ == '__main__':
    p = mp.Pool(4)
    tasks = ['str{}'.format(i) for i in range(2000)]
    results = p.map(do_stuff, tasks)
    p.close()

    logger.info(results)

产生如下日志记录输出:

[DEBUG/MainProcess] created semlock with handle 139824443588608
[DEBUG/MainProcess] created semlock with handle 139824443584512
[DEBUG/MainProcess] created semlock with handle 139824443580416
[DEBUG/MainProcess] created semlock with handle 139824443576320
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-1] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-2] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-3] child process calling self.run()
[DEBUG/MainProcess] added worker
[INFO/PoolWorker-4] child process calling self.run()
[INFO/PoolWorker-1] Received str0
[INFO/PoolWorker-2] Received str125
[INFO/PoolWorker-3] Received str250
[INFO/PoolWorker-4] Received str375
[INFO/PoolWorker-3] Received str251
...
[INFO/PoolWorker-4] Received str1997
[INFO/PoolWorker-4] Received str1998
[INFO/PoolWorker-4] Received str1999
[DEBUG/MainProcess] closing pool
[INFO/MainProcess] ['str0', 'str1', 'str2', 'str3', ...]
[DEBUG/MainProcess] worker handler exiting
[DEBUG/MainProcess] task handler got sentinel
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] task handler sending sentinel to result handler
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] finalizing pool
[DEBUG/MainProcess] task handler sending sentinel to workers
[DEBUG/MainProcess] helping task handler/workers to finish
[DEBUG/MainProcess] result handler got sentinel
[DEBUG/PoolWorker-3] worker got sentinel -- exiting
[DEBUG/MainProcess] removing tasks from inqueue until task handler finished
[DEBUG/MainProcess] ensuring that outqueue is not full
[DEBUG/MainProcess] task handler exiting
[DEBUG/PoolWorker-3] worker exiting after 2 tasks
[INFO/PoolWorker-3] process shutting down
[DEBUG/MainProcess] result handler exiting: len(cache)=0, thread._state=0
[DEBUG/PoolWorker-3] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] joining worker handler
[DEBUG/MainProcess] terminating workers
[DEBUG/PoolWorker-3] running the remaining "atexit" finalizers
[DEBUG/MainProcess] joining task handler
[DEBUG/MainProcess] joining result handler
[DEBUG/MainProcess] joining pool workers
[DEBUG/MainProcess] cleaning up worker 4811
[DEBUG/MainProcess] running the remaining "atexit" finalizers

请注意,每一行都指示哪个进程发出了日志记录。因此,输出在一定程度上将并发进程中的事件顺序序列化。

通过明智地放置logging.info呼叫,您应该能够缩小脚本“静默死亡”的位置和原因(或者说,至少死后不会如此安静)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

python multiprocessing pool.map挂起

来自分类Dev

python multiprocessing pool.map挂起

来自分类Dev

Python Multiprocessing Pool.map导致__new__错误

来自分类Dev

Python:非响应multiprocessing.pool.map_async()函数

来自分类Dev

对Python的multiprocessing.Pool的`processes`参数的澄清

来自分类Dev

python multiprocessing pool assign object to worker

来自分类Dev

multiprocessing.Pool.map抛出MemoryError

来自分类Dev

如何在循环中使用python multiprocessing Pool.map

来自分类Dev

Python multiprocessing.Pool map()“ TypeError:字符串索引必须是整数,而不是str”

来自分类Dev

如何在python中使用multiprocessing.Pool.map实现对象中的功能?

来自分类Dev

列表长于进程数时的Python multiprocessing.Pool.map行为

来自分类Dev

如何在循环中使用python multiprocessing Pool.map

来自分类Dev

如何在python中使用multiprocessing.Pool.map实现对象中的功能?

来自分类Dev

使用multiprocessing pool.map进行分布式计算的python

来自分类Dev

python multiprocessing.pool.map,将参数传递给生成的进程

来自分类Dev

Python无法使用multiprocessing.pool分配内存

来自分类Dev

如何使用Python multiprocessing.Pool实现Java FixedThreadPool

来自分类Dev

如何实现对python multiprocessing.Pool的自定义控制?

来自分类Dev

Python multiprocessing.Pool()不使用每个CPU的100%

来自分类Dev

Python:不要映射multiprocessing.Pool()的空结果

来自分类Dev

如果创建multiprocessing.Pool,则Python子进程wait()失败

来自分类Dev

Python multiprocessing.Pool()不使用每个CPU的100%

来自分类Dev

澄清了Python的multiprocessing.Pool的`processes`参数

来自分类Dev

设置maxtasksperchild时,Python multiprocessing.Pool失败

来自分类Dev

Python:不要映射multiprocessing.Pool()的空结果

来自分类Dev

Python 如何使用 multiprocessing.pool 并行下载多个文件

来自分类Dev

如何使用multiprocessing和pool.map跟踪状态?

来自分类Dev

检索使用multiprocessing.Pool.map启动的进程的退出代码

来自分类Dev

如何使用带有多个参数的multiprocessing pool.map?

Related 相关文章

  1. 1

    python multiprocessing pool.map挂起

  2. 2

    python multiprocessing pool.map挂起

  3. 3

    Python Multiprocessing Pool.map导致__new__错误

  4. 4

    Python:非响应multiprocessing.pool.map_async()函数

  5. 5

    对Python的multiprocessing.Pool的`processes`参数的澄清

  6. 6

    python multiprocessing pool assign object to worker

  7. 7

    multiprocessing.Pool.map抛出MemoryError

  8. 8

    如何在循环中使用python multiprocessing Pool.map

  9. 9

    Python multiprocessing.Pool map()“ TypeError:字符串索引必须是整数,而不是str”

  10. 10

    如何在python中使用multiprocessing.Pool.map实现对象中的功能?

  11. 11

    列表长于进程数时的Python multiprocessing.Pool.map行为

  12. 12

    如何在循环中使用python multiprocessing Pool.map

  13. 13

    如何在python中使用multiprocessing.Pool.map实现对象中的功能?

  14. 14

    使用multiprocessing pool.map进行分布式计算的python

  15. 15

    python multiprocessing.pool.map,将参数传递给生成的进程

  16. 16

    Python无法使用multiprocessing.pool分配内存

  17. 17

    如何使用Python multiprocessing.Pool实现Java FixedThreadPool

  18. 18

    如何实现对python multiprocessing.Pool的自定义控制?

  19. 19

    Python multiprocessing.Pool()不使用每个CPU的100%

  20. 20

    Python:不要映射multiprocessing.Pool()的空结果

  21. 21

    如果创建multiprocessing.Pool,则Python子进程wait()失败

  22. 22

    Python multiprocessing.Pool()不使用每个CPU的100%

  23. 23

    澄清了Python的multiprocessing.Pool的`processes`参数

  24. 24

    设置maxtasksperchild时,Python multiprocessing.Pool失败

  25. 25

    Python:不要映射multiprocessing.Pool()的空结果

  26. 26

    Python 如何使用 multiprocessing.pool 并行下载多个文件

  27. 27

    如何使用multiprocessing和pool.map跟踪状态?

  28. 28

    检索使用multiprocessing.Pool.map启动的进程的退出代码

  29. 29

    如何使用带有多个参数的multiprocessing pool.map?

热门标签

归档