EventHub ForEach 并行异步

米恩凯

总是设法让自己与异步工作混淆,我在这里进行了一些验证/确认之后,我正在做我认为我在以下情况下正在做的事情..

给出以下简单示例:

// pretend / assume these are json msgs or something ;)
var strEvents = new List<string> { "event1", "event2", "event3" };

我可以简单地将每个事件发布到 eventhub,如下所示:

foreach (var e in strEvents)
{
    // Do some things
    outEventHub.Add(e); // ICollector
}

foreach 将在单个线程上运行,并按顺序执行内部的每件事.. 我猜对 eventhub 的发布也将保留在同一线程上?

将 ICollector 更改为 IAsyncCollector,并实现以下效果:

foreach (var e in strEvents)
{
    // Do some things
    await outEventHub.AddAsync(e);
}

我想我是在这里说 foreach 将在单个线程上运行,实际发送到事件中心将被推到别处?或者至少不要阻塞同一个线程..

更改为 Parallel.ForEach 事件,因为这些事件将一次到达 100+ 左右:

 Parallel.ForEach(events, async (e) =>
 {
      // Do some things
      await outEventHub.AddAsync(e);
 });

开始变得有点朦胧了,因为我不知道真正是怎么回事,现在据我所知......对每个事件都有它自己的线程的线程中和步骤(硬件的范围内)不会阻止它..所以抛开这个微不足道的例子。

最后,我可以将它们全部转换为我认为的任务。

 private static async Task DoThingAsync(string e, IAsyncCollector<string> outEventHub)
 {
      await outEventHub.AddAsync(e);
 }

 var t = new List<Task>();

 foreach (var e in strEvents)
 {
      t.Add(DoThingAsync(e, outEventHub));
 }

 await Task.WhenAll(t);

现在我真的很朦胧,我认为这是在单个线程上准备所有内容..然后在任何可用线程上完全同时运行所有内容?

我很欣赏为了确定哪个适合手头的工作需要进行基准测试……但是对框架在每种情况下所做的事情的解释现在对我来说非常有帮助..

马塞尔·托特

并行 != 异步

这是这里的主要思想。两者都有各自的用途,也可以一起使用,但是却有很大的不同。您的假设基本正确,但让我澄清一下:

简单的foreach

是非并行非异步的没什么好谈的。

在 foreach 中等待

是非并行的异步代码

foreach (var e in strEvents)
{
    // Do some things
    await outEventHub.AddAsync(e);
}

这一切都将发生在一个线程上。它接受一个事件,开始将其添加到您的事件中心,并在完成时(我猜它执行某种网络 IO)将线程交还给线程池(或 UI,如果它在UI 线程),所以它可以在等待AddAsync返回的同时做其他工作但正如你所说,根本不是平行的。

并行 Foreach(异步)

这是一个陷阱!简而言之,Parallel.Foreach专为同步工作负载而设计。我们将回到这一点,但首先让我们假设您将它与非异步代码一起使用。

并行 foreach(同步)

又名并行但不是异步。

Parallel.ForEach(events, (e) =>
 {
      // Do some things
      outEventHub.Add(e);
 });

每个项目都有自己的“任务”,但它们不会产生线程。创建线程的成本很高,在最佳情况下,线程数多于 CPU 内核毫无意义。相反,这些任务在ThreadPool上运行,它具有与最佳线程一样多的线程。每个线程接受一个任务,处理它,然后接受另一个任务,依此类推。

您可以将其视为 - 在 4 核机器上 - 在一堆任务周围有 4 个工人,因此一次运行其中 4 个。您可以想象这在 IO 绑定工作负载的情况下并不理想(这很可能是)。如果您的网络很慢,您可以在尝试将事件发送出去时阻止所有 4 个线程,而它们可能正在做有用的工作。这导致我们...

任务

异步和可能并行(取决于使用情况)。

您的描述在这里也是正确的,除了 ThreadPool,它同时启动所有任务(在主线程上),然后在池的线程上运行。当它们运行时,主线程被释放,然后可以根据需要做其他工作。到目前为止,情况与此相同Parallel.Foreach但:

发生的事情是 TaskPool 线程接收一个任务,进行必要的预处理,然后异步发送网络请求这意味着该任务在等待网络时不会阻塞,而是释放ThreadPool线程以获取另一个工作项。当网络请求完成时,任务继续(网络请求之后的剩余代码行)被调度回任务列表。

您可以看到理论上这是最有效的过程,速度如此之快,以至于您必须小心不要淹没您的网络。

回到 Parallel.Foreach 和 async

此时,您应该能够发现问题。你的异步 lambdaasync (e) => { await outEventHub.AddAsync(e);}所做的就是开始工作,它会在遇到await. (记住 async/await 在等待时释放线程。)Parallel.Foreach在它启动所有线程后立即返回。但是没有什么在等待这些任务!这些变成了火和遗忘,这通常是一种不好的做法。就像您await Task.WhenAll从任务示例中删除了调用一样

我希望这为您清除了大部分内容,如果没有,请告诉我要改进的地方。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章