如Rugnar的回答所述,我的繁殖是错误的。我大部分时间都保持原样,因为我不确定这在澄清和更改含义之间。
我有成千上万的工作需要运行,并且希望有任何错误立即停止执行。我将任务包装在try
/ except
…中,raise
以便可以记录错误(没有所有的多处理/线程噪音),然后重新引发。这并没有杀死的主要过程。
发生了什么事,如何才能找到想要的提早退房?sys.exit(1)
在子死锁中,将try
/ except
…raise
函数包装在另一个函数中也不起作用。
$ python3 mp_reraise.py
(0,)
(1,)
(2,)
(3,)
(4,)
(5,)
(6,)
(7,)
(8,)
(9,)
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "mp_reraise.py", line 5, in f_reraise
raise Exception(args)
Exception: (0,)
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mp_reraise.py", line 14, in <module>
test_reraise()
File "mp_reraise.py", line 12, in test_reraise
p.map(f_reraise, range(10))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
Exception: (0,)
mp_reraise.py
import multiprocessing
def f_reraise(*args):
try:
raise Exception(args)
except Exception as e:
print(e)
raise
def test_reraise():
with multiprocessing.Pool() as p:
p.map(f_reraise, range(10))
test_reraise()
如果我不追赶和加注,则执行会按预期提前终止:[根据Rugnar的回答,这实际上并没有停止]
$ python3 mp_raise.py
multiprocessing.pool.RemoteTraceback:
"""
Traceback (most recent call last):
File "/usr/lib/python3.6/multiprocessing/pool.py", line 119, in worker
result = (True, func(*args, **kwds))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 44, in mapstar
return list(map(*args))
File "mp_raise.py", line 4, in f_raise
raise Exception(args)
Exception: (0,)
"""
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "mp_raise.py", line 10, in <module>
test_raise()
File "mp_raise.py", line 8, in test_raise
p.map(f_raise, range(10))
File "/usr/lib/python3.6/multiprocessing/pool.py", line 266, in map
return self._map_async(func, iterable, mapstar, chunksize).get()
File "/usr/lib/python3.6/multiprocessing/pool.py", line 644, in get
raise self._value
Exception: (0,)
mp_raise.py
import multiprocessing
def f_raise(*args):
# missing print, which would demonstrate that
# this actually does not stop early
raise Exception(args)
def test_raise():
with multiprocessing.Pool() as p:
p.map(f_raise, range(10))
test_raise()
在您中,mp_raise.py
您什么都不打印,所以您看不到完成了多少工作。我添加了打印内容,发现只有在工作迭代器用尽时,池才能看到该子项的一部分。因此,它永远不会停止。
如果您需要在异常发生后提前停止,请尝试此操作
import time
import multiprocessing as mp
def f_reraise(i):
if abort.is_set(): # cancel job if abort happened
return
time.sleep(i / 1000) # add sleep so jobs are not instant, like in real life
if abort.is_set(): # probably we need stop job in the middle of execution if abort happened
return
print(i)
try:
raise Exception(i)
except Exception as e:
abort.set()
print('error:', e)
raise
def init(a):
global abort
abort = a
def test_reraise():
_abort = mp.Event()
# jobs should stop being fed to the pool when abort happened
# so we wrap jobs iterator this way
def pool_args():
for i in range(100):
if not _abort.is_set():
yield i
# initializer and init is a way to share event between processes
# thanks to https://stackoverflow.com/questions/25557686/python-sharing-a-lock-between-processes
with mp.Pool(8, initializer=init, initargs=(_abort,)) as p:
p.map(f_reraise, pool_args())
if __name__ == '__main__':
test_reraise()
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句