我有一个concurrent.futures.ThreadPoolExecutor
和一个清单。并使用以下代码将期货添加到ThreadPoolExecutor:
for id in id_list:
future = self._thread_pool.submit(self.myfunc, id)
self._futures.append(future)
然后我等待列表:
concurrent.futures.wait(self._futures)
但是,self.myfunc
某些网络I / O也会执行,因此会有一些网络异常。当发生错误时,self.myfunc
将self.myfunc
与之相同的新id
线程提交到相同的线程池,并向同一个列表添加一个新的future,就像上面的那样:
try:
do_stuff(id)
except:
future = self._thread_pool.submit(self.myfunc, id)
self._futures.append(future)
return None
问题来了:我在的行出现错误concurrent.futures.wait(self._futures)
:
File "/usr/lib/python3.4/concurrent/futures/_base.py", line 277, in wait
f._waiters.remove(waiter)
ValueError: list.remove(x): x not in list
在等待时如何将新的期货适当地添加到列表中?
查看的实现wait()
,当然不希望外界有任何东西concurrent.futures
会改变传递给它的列表。因此,我认为您永远不会“工作”。这不仅是因为它不希望列表发生突变,而且还需要对列表条目进行大量处理,并且实现方式无法知道您添加了更多条目。
未经测试,我建议改用这种方法:跳过所有步骤,仅保持运行中的运行线程数仍处于活动状态。一种简单的方法是使用Condition
保护计数。
初始化:
self._count_cond = threading.Condition()
self._thread_count = 0
当my_func
输入的(即,当一个新的线程开始):
with self._count_cond:
self._thread_count += 1
什么时候my_func
完成(即线程何时结束),无论出于何种原因(无论是否异常):
with self._count_cond:
self._thread_count -= 1
self._count_cond.notify() # wake up the waiting logic
最后是主要的等待逻辑:
with self._count_cond:
while self._thread_count:
self._count_cond.wait()
在为新线程提交工作时,线程计数可能达到0,但是在其my_func
调用开始运行之前(因此在_thread_count
递增之前要考虑新线程)。
所以:
with self._count_cond:
self._thread_count += 1
实际上,应该在每次出现之前立即完成部分操作
self._thread_pool.submit(self.myfunc, id)
或编写一个新方法来封装该模式;例如,像这样:
def start_new_thread(self, id):
with self._count_cond:
self._thread_count += 1
self._thread_pool.submit(self.myfunc, id)
暂时,我希望这也可以工作(但是,再次,它没有经过测试):除了更改您的等待方式之外,所有代码都保持相同:
while self._futures:
self._futures.pop().result()
因此,这仅一次等待一个线程,直到没有线程可用为止。
请注意,.pop()
and .append()
on list在CPython中是原子的,因此不需要您自己的锁。并且由于您的my_func()
代码会在线程运行结束之前追加,因此列表不会在所有线程真正完成之前为空。
保留原始的等待代码,但对其余部分进行重新处理,以防万一发生异常时不创建新线程。就像重写一样my_func
,True
如果由于异常而退出则返回,False
否则返回,然后启动运行包装程序的线程:
def my_func_wrapper(self, id):
keep_going = True
while keep_going:
keep_going = self.my_func(id)
如果您有朝一日决定使用多个进程而不是多个线程,那么这可能会特别有吸引力(在某些平台上创建新进程的成本可能会高得多)。
另一种方法是只更改等待的代码:
while self._futures:
fs = self._futures[:]
for f in fs:
self._futures.remove(f)
concurrent.futures.wait(fs)
清除?这会使列表的副本传递到.wait()
,并且该副本永远不会发生突变。新线程将显示在原始列表中,并且重复整个过程,直到没有新线程显示为止。
以下哪种方式最有意义,在我看来主要依靠语用学,但有不是足够的信息都你这样做对我来说,做一个有关的猜测。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句