我有几种计算密集型方法,试图使用async-await并行运行。
我有大约80,000个对象的列表,这些对象馈入一个返回任务的函数中:
public static void Main(string[] args)
{
//...blah blah blah...
var runner = new Runner(); //in a nutshell, I manage to get an object that has an async method on it.
runner.Run().Wait(); //and I wait for it to complete.
//...blah blah blah...
}
我的跑步者对象中具有以下方法(或多或少...这是一个人为的示例):
public async Task Run()
{
var items = ... //this is my list
var tasks = items.Select(i => this.RunItemAsync(i)).ToArray();
//I don't get here until the tasks are all finished...every single one...
await Task.WhenAll(tasks).ConfigureAwait(false);
}
private async Task RunItemAsync(Item i)
{
var subItems = i.GetSubItems();
var tasks = subItems.Select(s => s.RunSubItemAsync(s)).ToArray();
//I don't get here until the sub item tasks are all finished...
await Task.WhenAll(tasks).ConfigureAwait(false);
//does computations, doesn't wait on any async i/o, etc
await this.ProcessAsync(i).ConfigureAwait(false);
}
private async Task RunSubItemAsync(SubItem s)
{
//does computations, doesn't wait on any async i/o, etc
...
}
在过去的一年左右的时间里,我一直在努力等待异步,有时会获得出色的性能,并使用TPL Dataflow做一些非常酷的事情,但是有时我会遇到类似这样的事情,但我似乎无法“激活”其并行能力的任务。这个特定的项目将在具有约16个内核的服务器上运行,所以我真的很想利用这一点。我的开发VM仅分配了2个核心,但是仍然应该允许任务激活和并行运行(过去已经有)。
我的观察
await Task.Delay(1).ConfigureAwait(false)
在该RunItemAsync
方法的开头插入一个小字符来并行运行此命令。我知道这会创建某种形式的“呼吸室”,从而允许另一个任务使用该线程。但是,这还不够,因为它很脏,不可靠,并且要求我进行不可接受的延迟。Delay
调用的情况下,所有任务都在上运行Main Thread
。这对我来说很明显,因为Main
它是启动所有功能的功能。我对此没有任何问题,但是我过去有过在new Thread
创建的线程上运行任务导致其无法使用默认任务计划程序运行的经验,并且每个任务最终都在该线程上依次运行。也许Main Thread
属于这个类别?我的问题
我了解运行ToArray
本身不会执行异步代码。但是,我想发生的是,当我的RunItemAsync
方法到达第一个方法时await
,它将“停止”并允许调用的下一次迭代ToArray
运行。
我也了解添加作品是await Task.Delay
因为它正是我上面想要的。一定有某种方法可以做到这一点而不求助于await Task.Delay
...
如何并行启动所有这些受计算限制的任务,而不会无意间使它们依次运行?
今天有四种主要的并发库/技术。
async
最适合自然异步的单个操作,例如I / O。async
和并行,为处理数据提供了网格/管道抽象。对于您的情况,您想使用TPL。一个简单的Parallel.ForEach
就足够了。
最后要注意的是,同步代码(包括与CPU绑定的并行代码)应具有同步API。并且异步代码应具有异步API。因此,您希望您的API看起来是同步的,而不是异步的。
因此,如下所示:
public static void Main(string[] args)
{
var runner = new Runner();
runner.Run();
}
public void Run()
{
var items = ...
Parallel.ForEach(items, i => this.RunItem(i));
}
private void RunItem(Item i)
{
var subItems = i.GetSubItems();
Parallel.ForEach(subItems, s => s.RunSubItem(s));
this.Process(i);
}
private void RunSubItem(SubItem s)
{
SemaphoreSlim.Wait(); // instead of WaitAsync
...
}
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句