我有一项服务,需要尽快读取来自Amazon SQS的消息。我们期望流量很大,我希望能够每秒读取1万条以上的消息。不幸的是,我目前每秒大约有10条消息。显然,我有工作要做。
这就是我正在使用的(转换为控制台应用程序以简化测试):
private static int _concurrentRequests;
private static int _maxConcurrentRequests;
public static void Main(string[] args) {
_concurrentRequests = 0;
_maxConcurrentRequests = 100;
var timer = new Timer();
timer.Elapsed += new ElapsedEventHandler(OnTimedEvent);
timer.Interval = 10;
timer.Enabled = true;
Console.ReadLine();
timer.Dispose();
}
public static void OnTimedEvent(object s, ElapsedEventArgs e) {
if (_concurrentRequests < _maxConcurrentRequests) {
_concurrentRequests++;
ProcessMessages();
}
}
public static async Task ProcessMessages() {
var manager = new MessageManager();
manager.ProcessMessages(); // this is an async method that reads in the messages from SQS
_concurrentRequests--;
}
我没有收到接近100个并发请求,而且似乎也不是OnTimedEvent
每10毫秒触发一次。
我不确定Timer
这里是否正确。我对这种编码没有太多的经验。我愿意在这一点上尝试任何事情。
更新
多亏了calebboyd,我才更接近实现自己的目标。这是一些非常糟糕的代码:
private static SemaphoreSlim _locker;
public static void Main(string[] args) {
_manager = new MessageManager();
RunBatchProcessingForeverAsync();
}
private static async Task RunBatchProcessingForeverAsync() {
_locker = new SemaphoreSlim(10, 10);
while (true) {
Thread thread = new Thread(new ParameterizedThreadStart(Process));
thread.Start();
}
}
private static async void Process(object args) {
_locker.WaitAsync();
try {
await _manager.ProcessMessages();
}
finally {
_locker.Release();
}
}
这样我可以接近每秒读取大量消息,但是问题是我的ProcessMessages
通话永远都无法结束(或者可能会在很长一段时间后结束)。我想我可能需要限制我一次运行的线程数。
关于如何改进此代码,以便ProcessMessages
有机会完成的任何建议?
如@calebboyd所建议,首先必须使线程异步。现在,如果您去这里-调用API时在何处使用并发,您将看到一个异步线程足以用于快速地池化网络资源。如果您能够在一个请求中从亚马逊获取多条消息,那么您的生产者线程(对亚马逊进行异步调用的那个)就可以了-每秒可以发送数百个请求。这不会成为您的瓶颈。但是,将处理接收到的数据的继续任务移交给线程池。在这里,您有机会遇到瓶颈-假设每秒有100条响应到达,每个响应包含100条消息(达到您的10K msgs / sec近似值)。每秒钟您有100个新任务,每个任务都需要您的线程来处理100条消息。现在有两个选择:(1)这些消息的处理不受CPU限制-您只需将它们发送到数据库即可;或者(2),您需要执行消耗CPU的计算,例如科学计算,序列化或一些繁重的业务逻辑。如果您的情况是(1),则将瓶颈向后推向DB。如果为(2),那么您别无选择,只能放大/缩小或优化计算。但是您的瓶颈可能不是生产线程-如果实现正确(请参阅上面的链接以获取示例)。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句