我正在创建一百万个值,并使用一个滑动窗口将它们分为训练和测试,该滑动窗口使用长度为1000个值的窗口,每次滑动一个值。
例如,第一个过程是将前1000个值拆分为火车,然后将1001st值拆分为测试。第二个过程在火车中的值为2到1001,在测试中的值为1002nd,依此类推。
运行脚本需要76.28秒。我曾经timeit
测量过。
现在,我想通过使用多个处理器运行滑动窗口来减少此时间。我用Pool
从multiprocessing
有4个CPU,但它并没有改变,在所有的性能。我想知道有什么更好的方法吗?
码:
from multiprocessing import Process
from multiprocessing import Pool
import numpy as np
import pandas as pd
from timeit import default_timer as timer
start = timer()
data = list(range(1_000_000))
window_size = 1_000
splits = []
def sw(window_size, data):
for i in range(window_size, len(data)):
train = np.array(data[i - window_size:i])
test = np.array(data[i:i + 1])
splits.append(('TRAIN:', train, 'TEST:', test))
# sw(window_size, data)
# print(splits)
if __name__ == '__main__':
p= Pool(4)
p = Process(target=sw, args=(window_size, data))
p.start()
p.join()
end = timer()
print(end - start)
实际上,正如评论所指出的那样,您要做的就是创建一个名为的Pool p
,然后将该变量重新分配给过程结果。我稍微重写了您的滑动窗口功能。并行化独立任务的一种简单方法是,指定要对一项执行的操作,然后仅使用地图函子。在Intel core [email protected]
(具有超线程的双核)上执行基准测试。
from multiprocessing import Process
from multiprocessing import Pool
import numpy as np
from timeit import default_timer as timer
NUM_EL = 1_000_000
WINDOW_SIZE = 1000
DATA = list(range(NUM_EL))
def window(start_idx, window_size=WINDOW_SIZE, data=DATA):
_train = np.array(data[start_idx:start_idx + window_size])
_test = np.array(data[start_idx + window_size + 1])
# return something useful here
return start_idx
if __name__ == '__main__':
STARTS = list(range(NUM_EL - WINDOW_SIZE - 1))
start = timer()
result_single = list(map(window, STARTS))
end = timer()
print("Single core: ", end - start)
start = timer()
with Pool(4) as p:
result_multi = p.map(window, STARTS)
end = timer()
print(result_single == result_multi)
print("Multiprocessing: ", end - start)
>>> Single core: 99.9821742
>>> Multiprocessing: 38.71327739999998
注意:此代码很可能在使用的任何环境中均不起作用IPython
。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句