我在机器学习输入管道上工作。我写了一个数据加载器,它从一个大的.hdf文件中读取数据并返回分片,每个分片大约需要2秒钟。因此,我想使用一个队列,该队列接收来自多个数据加载器的对象,并可以通过下一个函数(例如生成器)从队列中返回单个对象。此外,填充队列的进程应以某种方式在后台运行,并在队列未满时重新填充队列。我没有使其正常工作。它使用单个数据加载器工作,给了我4倍的相同切片。
import multiprocessing as mp
class Queue_Generator():
def __init__(self, data_loader_list):
self.pool = mp.Pool(4)
self.data_loader_list = data_loader_list
self.queue = mp.Queue(maxsize=16)
self.pool.map(self.fill_queue, self.data_loader_list)
def fill_queue(self,gen):
self.queue.put(next(gen))
def __next__(self):
yield self.queue.get()
我从中得到的是:NotImplementedError:池对象不能在进程之间传递或腌制。
您的特定错误意味着将类方法传递给池时,不能将池作为类的一部分。我建议的可能是以下内容:
import multiprocessing as mp
from queue import Empty
class QueueGenerator(object):
def __init__(self, data_loader_list):
self.data_loader_list = data_loader_list
self.queue = mp.Queue(maxsize=16)
def __iter__(self):
processes = list()
for _ in range(4):
pr = mp.Process(target=fill_queue, args=(self.queue, self.data_loader_list))
pr.start()
processes.append(pr)
return self
def __next__(self):
try:
return self.queue.get(timeout=1) # this should have a value, otherwise your loop will never stop. make it something that ensures your processes have enough time to update the queue but not too long that your program freezes for an extended period of time after all information is processed
except Empty:
raise StopIteration
# have fill queue as a separate function
def fill_queue(queue, gen):
while True:
try:
value = next(gen)
queue.put(value)
except StopIteration: # assumes the given data_loader_list is an iterator
break
print('stopping')
gen = iter(range(70))
qg = QueueGenerator(gen)
for val in qg:
print(val)
# test if it works several times:
for val in qg:
print(val)
我想为您解决的下一个问题是让data_loader_list在每个单独的过程中提供新信息。但是由于您还没有提供任何信息,因此我无法为您提供帮助。但是上述内容确实为您提供了一种使进程填充队列的方法,然后将其作为迭代器传递出去。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句