想象一下,我有一个包含10个工人和40个核心总数的敏捷网格。这是一个共享的网格,所以我不想让我的工作完全饱和。我有1000项任务要做,并且我想一次提交(并一直在运行)最多20项任务。
具体来说,
from time import sleep
from random import random
def inc(x):
from random import random
sleep(random() * 2)
return x + 1
def double(x):
from random import random
sleep(random())
return 2 * x
>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>
如果我设置队列系统
>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)
这将起作用,但是,这只会将我的所有任务转储到网格中,从而使其饱和。理想情况下,我可以:
e.scatter(input_q, max_submit=20)
看来这里文档的示例将允许我使用maxsize
队列。但是从用户的角度来看,我仍然不得不应对背压问题。理想情况下dask
会自动进行此处理。
maxsize=
你很亲密 所有的scatter
,gather
以及map
采取相同的maxsize=
关键字参数是Queue
需要。因此,一个简单的工作流程可能如下:
from time import sleep
def inc(x):
sleep(1)
return x + 1
your_input_data = list(range(1000))
from queue import Queue # Put your data into a queue
q = Queue()
for i in your_input_data:
q.put(i)
from dask.distributed import Executor
e = Executor('127.0.0.1:8786') # Connect to cluster
futures = e.map(inc, q, maxsize=20) # Map inc over data
results = e.gather(futures) # Gather results
L = []
while not q.empty() or not futures.empty() or not results.empty():
L.append(results.get()) # this blocks waiting for all results
所有的q
,futures
以及results
是Python的队列对象。该q
和results
队列没有限制,所以他们会贪婪地拉多,因为他们可以。futures
但是,队列的最大大小为20,因此在任何给定时间仅允许飞行20个期货。一旦领先的将来完成,它将立即被collect函数使用,并且其结果将被放入results
队列中。这样可以释放空间futures
并导致提交另一个任务。
请注意,这并不是您想要的。这些队列是有序的,因此只有当它们在队列的最前面时,期货才会被弹出。如果除第一个以外的所有机上期货均已完成,它们仍将排在队列中,占用空间。鉴于此限制,您可能想要选择maxsize=
比所需20
项目略多的项目。
在这里,我们做了一个简单的map->gather
管道,两者之间没有逻辑。您还可以map
在此处放置其他计算,甚至将期货从队列中取出,并自行进行定制工作。很容易脱离上面提供的模具。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句