每当我使用我的(其他)多处理代码时,它都可以正常工作,但是根据我对脚本完成情况的反馈,例如“已完成5/10个文件”,我不知道如何适应我的代码以返回计数。基本上,我想修改以下代码以允许进行多处理。
所以我用
file_paths = r"path to file with paths"
count = 0
pool = Pool(16)
pool.map(process_control, file_paths)
pool.close()
pool.join()
在process_control中,我在函数末尾 count += 1 and return count
我想等价的代码会像
def process_control(count, file_path):
do stuff
count += 1
print("Process {} / {} completed".format(count, len(file_paths))
return count
file_paths = r"path to file with paths"
count = 0
for path in file_paths:
count = process_control(count, path)
这样的事情。希望我的解释清楚。
每个子流程都有自己的副本,count
因此他们所能做的就是跟踪该流程中的工作。该计数不会汇总所有进程。但是父母可以计算。map
等待所有任务完成,因此无济于事。imap
更好,可以进行迭代,但也可以保持顺序,因此报告仍会延迟。imap_unordered
块大小为1的最佳选择。每个任务的返回值(即使它为None)都将立即返回。
def process_control(count, file_path):
do stuff
file_paths = ["path1", ...]
with multiprocessing.Pool(16) as pool:
count = 0
for _ in pool.imap_unordered(porcess_control, file_paths,chunksize=1):
count += 1
print("Process {} / {} completed".format(count, len(file_paths))
关于的注释chunksize
。使用池有成本-每个工作项都需要发送到子流程并返回其值。这种来回的IPC相对昂贵,因此池将“打包”工作项,这意味着它将把多个工作项全部发送到给定的子流程中,并且该流程仅在整个数据块时才返回已经通过worker函数进行了处理。
当有许多相对较短的工作项目时,这很棒。但是,假设不同的工作项需要花费不同的时间来执行。即使其他子进程已经完成,仍将有一个高杆子进程在其块上运行。
对于您的情况而言更重要的是,结果要等到块完成后才回传给父级,这样您就无法实时获得完成情况的报告。
将chunksize设置为1,子流程将立即返回结果,以实现更准确的计费。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句