任务并行库和长时间运行的任务

特雷弗布鲁克斯

我有一个实现 ExecuteAsync 的 BackgroundService (IHostedService),类似于使用 IHostedService 和 BackgroundService 类在微服务实现后台任务中的示例

我想同时运行多个在外部系统中执行长时间运行命令的任务。我希望服务继续执行这些任务,直到服务停止 - 但我不想同时执行具有相同参数的命令(如果它使用 item.parameter1 = 123 执行,我希望它等到 123完成,然后再次执行 123)。此外,它不应该阻塞,也不应该泄漏内存。如果发生异常,我想优雅地停止有问题的任务,记录它并重新启动。命令的每次执行都会获得不同的参数,因此如下所示:

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
    var items = GetItems(); //GetItems returns a List<Item>

    _logger.LogDebug($"ExternalCommandService is starting.");

    stoppingToken.Register(() => 
            _logger.LogDebug($" Execute external command background task is stopping."));

    while (!stoppingToken.IsCancellationRequested)
    {
        _logger.LogDebug($"External command task is doing background work.");

        //Execute the command with values from the item
        foreach(var item in items)
        {
             ExecuteExternalCommand(item);
        }

        await Task.Delay(_settings.CheckUpdateTime, stoppingToken);
    }

    _logger.LogDebug($"Execute external command background task is stopping.");

}

数据结构非常简单:

public class MyData
{
    public string Foo { get; set; }
    public string Blee { get; set; }
}

在任务开发方面,我是一个完全的新手,所以请原谅我缺乏理解。使 ExecuteExternalCommand 异步是否更有意义?我不确定这些任务是否并行执行。我究竟做错了什么?我如何完成其​​他要求,如异常处理和任务的正常重启?请帮忙。

迈克尔·帕克特二世

您有要按特定值分组的数据,然后在它自己的循环中重复运行。我将举例说明这样做的一种方法,希望您可以依靠它来获得您需要的答案。

注意:根据数据的模拟方式,此示例可能没有用。如果您在问题中提供了正确的数据结构,我将更新答案以匹配。

你有数据。

public struct Data
{
    public Data(int value) => Value = value;
    public int Value { get; }
    public string Text => $"{nameof(Data)}: {Value}";
}

您希望按特定值对数据进行分组。(注意:此示例可能不合逻辑,因为它按Value已经唯一的数据对数据进行分组。仅添加此示例:)

有很多方法可以做到这一点,但我会用LinqDistinctIEqualityComparer<T>为这个例子比较Data.Value

public class DataDistinction : IEqualityComparer<Data>
{
    public bool Equals(Data x, Data y) => x.Value == y.Value;
    public int GetHashCode(Data obj) => obj.Value.GetHashCode();
}

模拟数据在这个例子中,我将模拟一些数据。

var dataItems = new List<Data>();

for (var i = 0; i < 10; i++)
{
    dataItems.Add(new Data(i));
}

处理数据在本例中,我将使用IEqualityComparer<T>Data唯一地获取每个数据Value并开始处理。

private static void ProcessData(List<Data> dataItems)
{
    var groupedDataItems = dataItems.Distinct(new DataDistinction());

    foreach (var data in groupedDataItems)
    {
        LoopData(data); //We start unique loops here.
    }
}

循环唯一数据

对于这个例子,我选择Task使用Task.Factory. 这将是一次使用它是有道理的,但通常您不需要它。在这个例子中,我传入了一个Action<object>where 对象表示给Task. 我还会解析state您将看到的内容并开始while循环。while循环监测CancellationTokenSource注销。CancellationTokenSource为方便起见,在应用程序中静态声明了 您将在底部的完整控制台应用程序中看到它。

private static void LoopData(Data data)
{
    Task.Factory.StartNew((state) =>
    {
        var taskData = (Data)state;
        while (!cancellationTokenSource.IsCancellationRequested)
        {
            Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
            Task.Delay(100).Wait();
        }
    },
    data,
    cancellationTokenSource.Token,
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);
}

现在,我将其全部放入一个控制台应用程序中,您可以复制、粘贴和浏览。

这将获取您的数据,将其分解并Task无限期地运行它,直到程序结束。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Question_Answer_Console_App
{
    class Program
    {
        private static readonly CancellationTokenSource cancellationTokenSource = new CancellationTokenSource();
        static void Main(string[] args)
        {
            var dataItems = new List<Data>();

            for (var i = 0; i < 10; i++)
            {
                dataItems.Add(new Data(i));
            }   

            ProcessData(dataItems);
            Console.ReadKey();
            cancellationTokenSource.Cancel();
            Console.WriteLine("CANCELLING...");
            Console.ReadKey();
        }

        private static void ProcessData(List<Data> dataItems)
        {
            var groupedDataItems = dataItems.Distinct(new DataDistinction());

            foreach (var data in groupedDataItems)
            {
                LoopData(data);
            }
        }

        private static void LoopData(Data data)
        {
            Task.Factory.StartNew((state) =>
            {
                var taskData = (Data)state;
                while (!cancellationTokenSource.IsCancellationRequested)
                {
                    Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
                    Task.Delay(100).Wait();
                }
            },
            data,
            cancellationTokenSource.Token,
            TaskCreationOptions.LongRunning,
            TaskScheduler.Default);
        }

        ~Program() => cancellationTokenSource?.Dispose();
    }

    public struct Data
    {
        public Data(int value) => Value = value;
        public int Value { get; }
        public string Text => $"{nameof(Data)}: {Value}";
    }

    public class DataDistinction : IEqualityComparer<Data>
    {
        public bool Equals(Data x, Data y) => x.Value == y.Value;
        public int GetHashCode(Data obj) => obj.Value.GetHashCode();
    }
}
//OUTPUT UNTIL CANCELLED
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 5        Text: Data: 5
//Value: 1        Text: Data: 1
//Value: 7        Text: Data: 7
//Value: 4        Text: Data: 4
//Value: 0        Text: Data: 0
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 6        Text: Data: 6
//Value: 5        Text: Data: 5
//Value: 3        Text: Data: 3
//Value: 8        Text: Data: 8
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 2        Text: Data: 2
//Value: 9        Text: Data: 9
//Value: 7        Text: Data: 7
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 8        Text: Data: 8
//Value: 5        Text: Data: 5
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 1        Text: Data: 1
//Value: 4        Text: Data: 4
//Value: 9        Text: Data: 9
//Value: 7        Text: Data: 7
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 1        Text: Data: 1
//Value: 4        Text: Data: 4
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 1        Text: Data: 1
//Value: 9        Text: Data: 9
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 4        Text: Data: 4
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 4        Text: Data: 4
//Value: 9        Text: Data: 9
//Value: 1        Text: Data: 1
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 5        Text: Data: 5
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 4        Text: Data: 4
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 5        Text: Data: 5
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 8        Text: Data: 8
//Value: 1        Text: Data: 1
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 7        Text: Data: 7
//Value: 1        Text: Data: 1
//Value: 0        Text: Data: 0
//Value: 6        Text: Data: 6
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 7        Text: Data: 7
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
// CANCELLING...

在评论中,您询问了Task完成后如何写入控制台您可以awaitTask在这个例子中和当Task完成后,你可以打印Data作秀。async void不推荐有充分的理由,但这是正确使用它的一次。

这是更新LoopDataasync await签名。

private static async void LoopData(Data data)
{
    await Task.Factory.StartNew((state) =>
    {
        var taskData = (Data)state;
        while (!cancellationTokenSource.IsCancellationRequested)
        {
            Console.WriteLine($"Value: {taskData.Value}\tText: {taskData.Text}");
            Task.Delay(100).Wait();
        }
    },
    data,
    cancellationTokenSource.Token,
    TaskCreationOptions.LongRunning,
    TaskScheduler.Default);

    Console.WriteLine($"Task Complete: {data.Value} : {data.Text}");
}

//OUTPUT
//Value: 0        Text: Data: 0
//Value: 1        Text: Data: 1
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 5        Text: Data: 5
//Value: 4        Text: Data: 4
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 8        Text: Data: 8
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 1        Text: Data: 1
//Value: 6        Text: Data: 6
//Value: 9        Text: Data: 9
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 1        Text: Data: 1
//Value: 4        Text: Data: 4
//Value: 5        Text: Data: 5
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 8        Text: Data: 8
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 1        Text: Data: 1
//Value: 5        Text: Data: 5
//Value: 4        Text: Data: 4
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 0        Text: Data: 0
//Value: 3        Text: Data: 3
//Value: 2        Text: Data: 2
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 5        Text: Data: 5
//Value: 8        Text: Data: 8
//Value: 9        Text: Data: 9
//Value: 6        Text: Data: 6
//Value: 7        Text: Data: 7
//Value: 0        Text: Data: 0
//Value: 2        Text: Data: 2
//Value: 3        Text: Data: 3
//Value: 5        Text: Data: 5
//Value: 4        Text: Data: 4
//Value: 1        Text: Data: 1
//Value: 8        Text: Data: 8
//Value: 7        Text: Data: 7
//Value: 6        Text: Data: 6
//Value: 9        Text: Data: 9
// CANCELLING...
//Task Complete: 0 : Data: 0
//Task Complete: 2 : Data: 2
//Task Complete: 3 : Data: 3
//Task Complete: 1 : Data: 1
//Task Complete: 5 : Data: 5
//Task Complete: 4 : Data: 4
//Task Complete: 8 : Data: 8
//Task Complete: 6 : Data: 6
//Task Complete: 7 : Data: 7
//Task Complete: 9 : Data: 9..

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

并行长时间运行任务的时间优化

来自分类Dev

Rx如何并行化长时间运行的任务?

来自分类Dev

iOS:长时间运行的任务

来自分类Dev

长时间运行的任务或线程?

来自分类Dev

在后台运行长时间运行的并行任务,同时允许小型异步任务更新前台

来自分类Dev

UI线程中长时间运行的任务

来自分类Java

Wicket:如何处理长时间运行的任务

来自分类Java

处理长时间运行的EDT任务(通过TreeModel搜索)

来自分类Dev

onPause方法中长时间运行任务的最佳实践

来自分类Dev

如何使用Apache Camel处理长时间运行的任务

来自分类Java

Java长时间运行的任务线程中断与取消标志

来自分类Dev

Web服务器中长时间运行的任务

来自分类Dev

返回任务的接口的长时间运行同步实现

来自分类Dev

更改长时间运行的等待任务的参数

来自分类Dev

“长时间运行的任务”是什么意思?

来自分类Dev

EntityFramework在长时间运行的任务上,等待操作超时

来自分类Dev

使用 RxJava 处理长时间运行的任务

来自分类Dev

确定长时间运行的异步任务已完成

来自分类Dev

使用AWS服务安排长时间运行的任务

来自分类Dev

使用异步服务器长时间运行的任务

来自分类Dev

在长时间运行的 Celery 任务中使用 SQLAlchemy 对象

来自分类Dev

Botframework:如何使用Bot处理长时间运行的任务?

来自分类Dev

Kafka作为长时间运行任务的消息队列

来自分类Dev

ios中长时间运行的后台任务

来自分类Dev

RxJava - 在 map 和 doOnSuccess 中执行长时间运行任务的任何问题

来自分类Dev

如何使用asyncio和current.futures.ProcessPoolExecutor终止Python中长时间运行的计算(受CPU约束的任务)?

来自分类Dev

在没有root用户和特权用户的情况下计划长时间运行的laravel任务的方法

来自分类Dev

尝试使用Netbeans ProgressBar来显示长时间运行的任务,依赖关系和库有问题

来自分类Dev

更改长时间运行的任务以暂停/运行以减少cpu

Related 相关文章

  1. 1

    并行长时间运行任务的时间优化

  2. 2

    Rx如何并行化长时间运行的任务?

  3. 3

    iOS:长时间运行的任务

  4. 4

    长时间运行的任务或线程?

  5. 5

    在后台运行长时间运行的并行任务,同时允许小型异步任务更新前台

  6. 6

    UI线程中长时间运行的任务

  7. 7

    Wicket:如何处理长时间运行的任务

  8. 8

    处理长时间运行的EDT任务(通过TreeModel搜索)

  9. 9

    onPause方法中长时间运行任务的最佳实践

  10. 10

    如何使用Apache Camel处理长时间运行的任务

  11. 11

    Java长时间运行的任务线程中断与取消标志

  12. 12

    Web服务器中长时间运行的任务

  13. 13

    返回任务的接口的长时间运行同步实现

  14. 14

    更改长时间运行的等待任务的参数

  15. 15

    “长时间运行的任务”是什么意思?

  16. 16

    EntityFramework在长时间运行的任务上,等待操作超时

  17. 17

    使用 RxJava 处理长时间运行的任务

  18. 18

    确定长时间运行的异步任务已完成

  19. 19

    使用AWS服务安排长时间运行的任务

  20. 20

    使用异步服务器长时间运行的任务

  21. 21

    在长时间运行的 Celery 任务中使用 SQLAlchemy 对象

  22. 22

    Botframework:如何使用Bot处理长时间运行的任务?

  23. 23

    Kafka作为长时间运行任务的消息队列

  24. 24

    ios中长时间运行的后台任务

  25. 25

    RxJava - 在 map 和 doOnSuccess 中执行长时间运行任务的任何问题

  26. 26

    如何使用asyncio和current.futures.ProcessPoolExecutor终止Python中长时间运行的计算(受CPU约束的任务)?

  27. 27

    在没有root用户和特权用户的情况下计划长时间运行的laravel任务的方法

  28. 28

    尝试使用Netbeans ProgressBar来显示长时间运行的任务,依赖关系和库有问题

  29. 29

    更改长时间运行的任务以暂停/运行以减少cpu

热门标签

归档