如何在不使用所有工作人员的情况下限制大量任务

杰夫

想象一下,我有一个包含10个工人和40个核心总数的敏捷网格。这是一个共享的网格,所以我不想让我的工作完全饱和。我有1000项任务要做,并且我想一次提交(并一直在运行)最多20项任务。

具体来说,

from time import sleep
from random import random

def inc(x):
    from random import random
    sleep(random() * 2)
    return x + 1

def double(x):
    from random import random
    sleep(random())
    return 2 * x

>>> from distributed import Executor
>>> e = Executor('127.0.0.1:8786')
>>> e
<Executor: scheduler=127.0.0.1:8786 workers=10 threads=40>

如果我设置队列系统

>>> from queue import Queue
>>> input_q = Queue()
>>> remote_q = e.scatter(input_q)
>>> inc_q = e.map(inc, remote_q)
>>> double_q = e.map(double, inc_q)

这将起作用,但是,这只会将我的所有任务转储到网格中,从而使其饱和。理想情况下,我可以:

e.scatter(input_q, max_submit=20)

看来这里文档的示例将允许我使用maxsize队列。但是从用户的角度来看,我仍然不得不应对背压问题。理想情况下dask会自动进行此处理。

麦考林

使用 maxsize=

你很亲密 所有的scattergather以及map采取相同的maxsize=关键字参数是Queue需要。因此,一个简单的工作流程可能如下:

例子

from time import sleep

def inc(x):
    sleep(1)
    return x + 1

your_input_data = list(range(1000))

from queue import Queue              # Put your data into a queue
q = Queue()
for i in your_input_data:
    q.put(i)

from dask.distributed import Executor
e = Executor('127.0.0.1:8786')        # Connect to cluster


futures = e.map(inc, q, maxsize=20)  # Map inc over data
results = e.gather(futures)          # Gather results

L = []
while not q.empty() or not futures.empty() or not results.empty():
    L.append(results.get())  # this blocks waiting for all results

所有的qfutures以及results是Python的队列对象。qresults队列没有限制,所以他们会贪婪地拉多,因为他们可以。futures但是队列的最大大小为20,因此在任何给定时间仅允许飞行20个期货。一旦领先的将来完成,它将立即被collect函数使用,并且其结果将被放入results队列中。这样可以释放空间futures并导致提交另一个任务。

请注意,这并不是您想要的。这些队列是有序的,因此只有当它们在队列的最前面时,期货才会被弹出。如果除第一个以外的所有机上期货均已完成,它们仍将排在队列中,占用空间。鉴于此限制,您可能想要选择maxsize=比所需20项目略多的项目。

扩展这个

在这里,我们做了一个简单的map->gather管道,两者之间没有逻辑。您还可以map在此处放置其他计算,甚至将期货从队列中取出,并自行进行定制工作。很容易脱离上面提供的模具。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何在不使用全局变量的情况下与multiprocessing.Pool的工作人员共享内存?

来自分类Dev

在Dask中,如何基于全局(而非工作人员)资源限制来限制任务调度?

来自分类Dev

如何实现与UI无关的后台工作人员任务

来自分类Dev

如何实现与UI无关的后台工作人员任务

来自分类Dev

如何在工作人员关闭时向正在运行的任务发送消息?

来自分类Dev

所有工作人员的重叠日期范围

来自分类Dev

Python多处理池:如何知道池中的所有工作人员何时完成?

来自分类Dev

Spark Streaming应用程序使用所有工作人员

来自分类Dev

告诉Sidekiq使用所有可用的Heroku工作人员

来自分类Dev

如何在不使用大量内存的情况下将所有stdin写入stdout?

来自分类Dev

从单个工作人员收到消息后发送所有工作人员消息

来自分类Dev

如何在不使用Rx Framework的情况下限制事件的速度

来自分类Dev

如何在Rails sidekiq工作人员调用的视图中使用帮助程序?

来自分类Dev

python celery-如何在运行时向工作人员添加CELERYBEAT_SCHEDULE任务?

来自分类Dev

同一个工作人员有X个任务从Y个任务中脱颖而出的机会(在Z个工作人员中)

来自分类Dev

具有依赖关系和工作人员约束的任务调度优化

来自分类Dev

如何避免在Luigi中同时由多个工作人员运行特定任务

来自分类Dev

排除工作人员

来自分类Dev

Gearman PHP扩展:无效作业服务器=所有工作人员响应缓慢

来自分类Dev

适用于Resque中所有工作人员的ActiveRecord :: RecordNotFound

来自分类Dev

有什么方法可以在不使用 if 语句的情况下限制值?

来自分类Dev

如何正确取消永远工作的后台工作人员?

来自分类Dev

如何在独立集群模式下为每个工作人员分配更多执行程序?

来自分类Dev

如何在Elixir应用程序中找到工作人员?

来自分类Dev

如何在客户端GUI中报告工作人员“事件”

来自分类Dev

如何在我的ExUnit测试中保留(或阻止运行)对工作人员的呼叫?

来自分类Dev

如何在Cloudflare的工作人员中获取当前日期

来自分类Dev

如何在Asyncio中检查工作人员状态?

来自分类Dev

发生错误时如何在后台工作人员上再次调用函数

Related 相关文章

  1. 1

    如何在不使用全局变量的情况下与multiprocessing.Pool的工作人员共享内存?

  2. 2

    在Dask中,如何基于全局(而非工作人员)资源限制来限制任务调度?

  3. 3

    如何实现与UI无关的后台工作人员任务

  4. 4

    如何实现与UI无关的后台工作人员任务

  5. 5

    如何在工作人员关闭时向正在运行的任务发送消息?

  6. 6

    所有工作人员的重叠日期范围

  7. 7

    Python多处理池:如何知道池中的所有工作人员何时完成?

  8. 8

    Spark Streaming应用程序使用所有工作人员

  9. 9

    告诉Sidekiq使用所有可用的Heroku工作人员

  10. 10

    如何在不使用大量内存的情况下将所有stdin写入stdout?

  11. 11

    从单个工作人员收到消息后发送所有工作人员消息

  12. 12

    如何在不使用Rx Framework的情况下限制事件的速度

  13. 13

    如何在Rails sidekiq工作人员调用的视图中使用帮助程序?

  14. 14

    python celery-如何在运行时向工作人员添加CELERYBEAT_SCHEDULE任务?

  15. 15

    同一个工作人员有X个任务从Y个任务中脱颖而出的机会(在Z个工作人员中)

  16. 16

    具有依赖关系和工作人员约束的任务调度优化

  17. 17

    如何避免在Luigi中同时由多个工作人员运行特定任务

  18. 18

    排除工作人员

  19. 19

    Gearman PHP扩展:无效作业服务器=所有工作人员响应缓慢

  20. 20

    适用于Resque中所有工作人员的ActiveRecord :: RecordNotFound

  21. 21

    有什么方法可以在不使用 if 语句的情况下限制值?

  22. 22

    如何正确取消永远工作的后台工作人员?

  23. 23

    如何在独立集群模式下为每个工作人员分配更多执行程序?

  24. 24

    如何在Elixir应用程序中找到工作人员?

  25. 25

    如何在客户端GUI中报告工作人员“事件”

  26. 26

    如何在我的ExUnit测试中保留(或阻止运行)对工作人员的呼叫?

  27. 27

    如何在Cloudflare的工作人员中获取当前日期

  28. 28

    如何在Asyncio中检查工作人员状态?

  29. 29

    发生错误时如何在后台工作人员上再次调用函数

热门标签

归档