在C#中执行并发任务

欧文

我有一项服务,需要尽快读取来自Amazon SQS的消息。我们期望流量很大,我希望能够每秒读取1万条以上的消息。不幸的是,我目前每秒大约有10条消息。显然,我有工作要做。

这就是我正在使用的(转换为控制台应用程序以简化测试):

private static int _concurrentRequests;
private static int _maxConcurrentRequests;

public static void Main(string[] args) {
    _concurrentRequests = 0;
    _maxConcurrentRequests = 100;

    var timer = new Timer();
    timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
    timer.Interval = 10;
    timer.Enabled = true;

    Console.ReadLine();
    timer.Dispose();
}

public static void OnTimedEvent(object s, ElapsedEventArgs e) {
    if (_concurrentRequests < _maxConcurrentRequests) {
        _concurrentRequests++;
        ProcessMessages();
    }
}

public static async Task ProcessMessages() {
    var manager = new MessageManager();
    manager.ProcessMessages();  // this is an async method that reads in the messages from SQS

    _concurrentRequests--;
}

我没有收到接近100个并发请求,而且似乎也不是OnTimedEvent每10毫秒触发一次。

我不确定Timer这里是否正确。我对这种编码没有太多的经验。我愿意在这一点上尝试任何事情。

更新

多亏了calebboyd,我才更接近实现自己的目标。这是一些非常糟糕的代码:

private static SemaphoreSlim _locker;

public static void Main(string[] args) {
    _manager = new MessageManager();

    RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
    _locker = new SemaphoreSlim(10, 10);
    while (true) {
        Thread thread = new Thread(new ParameterizedThreadStart(Process));
        thread.Start();
    }
}

private static async void Process(object args) {
    _locker.WaitAsync();
    try {
        await _manager.ProcessMessages();
    }
    finally {
        _locker.Release();
    }

}

这样我可以接近每秒读取大量消息,但是问题是我的ProcessMessages通话永远都无法结束(或者可能会在很长一段时间后结束)。我想我可能需要限制我一次运行的线程数。

关于如何改进此代码,以便ProcessMessages有机会完成的任何建议

谢伊__

如@calebboyd所建议,首先必须使线程异步。现在,如果您去这里-调用API时在何处使用并发,您将看到一个异步线程足以用于快速地池化网络资源。如果您能够在一个请求中从亚马逊获取多条消息,那么您的生产者线程(对亚马逊进行异步调用的那个)就可以了-每秒可以发送数百个请求。这不会成为您的瓶颈。但是,将处理接收到的数据的继续任务移交给线程池。在这里,您有机会遇到瓶颈-假设每秒有100条响应到达,每个响应包含100条消息(达到您的10K msgs / sec近似值)。每秒钟您有100个新任务,每个任务都需要您的线程来处理100条消息。现在有两个选择:(1)这些消息的处理不受CPU限制-您只需将它们发送到数据库即可;或者(2),您需要执行消耗CPU的计算,例如科学计算,序列化或一些繁重的业务逻辑。如果您的情况是(1),则将瓶颈向后推向DB。如果为(2),那么您别无选择,只能放大/缩小或优化计算。但是您的瓶颈可能不是生产线程-如果实现正确(请参阅上面的链接以获取示例)。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

C#中的同步任务执行

来自分类Dev

在C#中异步执行多个任务

来自分类Dev

异步任务的许多实例可以共享对并发集合的引用,并在C#中向其中并发添加项目吗?

来自分类Dev

获取C#中已执行任务的统计信息

来自分类Dev

C#中的Task.WaitAll挂起了任务的执行

来自分类Dev

C#中的Task.WaitAll挂起了任务的执行

来自分类Dev

在 C# 中执行后台任务的最佳实践

来自分类Dev

在一个DAG中执行顺序和并发任务

来自分类Dev

在一个DAG中执行顺序和并发任务

来自分类Dev

并行执行多对并发任务

来自分类Dev

AFNetworking中的并发任务

来自分类Dev

C#并发-长期运行任务的首选方法

来自分类Dev

C#在执行之前建立任务列表

来自分类Dev

并发NSOperationQueue上传,然后执行单个任务

来自分类Dev

C#中的任务示例

来自分类Dev

杀死C#中的任务

来自分类Dev

C#中的任务示例

来自分类Dev

在C#中取消任务

来自分类Dev

在并发中检测失败的任务

来自分类Dev

如何在C#中延迟任务中部分代码的执行

来自分类Dev

在 C# 中执行 Parallel.Foreach 时布尔包含任务取消异常

来自分类Dev

多线程在C ++中执行单个任务

来自分类Dev

如何在非并发自定义nsoperation中执行异步任务

来自分类Dev

如何在C#中实现用于限制处理器/ IO繁重多线程任务的并发性的令牌系统?

来自分类Dev

C#执行任务并在完成时通知

来自分类Dev

C#任务在Task.WhenAll之前执行

来自分类Dev

C#执行任务并在完成时通知

来自分类Dev

在C#中的并发字典中交换键和值

来自分类Dev

在C#中执行IronPython