可能的根本问题是我正在使用的node-kafka模块是如何实现的,但是也许没有,所以我们开始吧...
使用node-kafa库,我在订阅consumer.on('message')
事件时遇到了问题。该库正在使用标准events
模块,因此我认为这个问题可能足够通用。
我的实际代码结构又大又复杂,因此这里是一个基本布局的伪示例,以突出我的问题。(注意:此代码段未经测试,因此我可能在这里出错,但是语法在这里始终不是问题)
var messageCount = 0;
var queryCount = 0;
// Getting messages via some event Emitter
consumer.on('message', function(message) {
message++;
console.log('Message #' + message);
// Making a database call for each message
mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
queryCount++;
console.log('Query #' + queryCount);
});
})
我在这里看到的是,当我启动服务器时,kafka希望向我发送10万个积压消息,并且它是通过事件发射器发送的。所以我开始收到消息。获取和记录所有消息大约需要15秒。
假设mysql查询相当快,这是我期望看到的输出:
Message #1
Message #2
Message #3
...
Message #500
Query #1
Message #501
Message #502
Query #2
... and so on in some intermingled fashion
我希望这是因为我的第一个mysql结果应该很快准备好,并且我希望结果在事件循环中轮到他们来处理响应。我实际上得到的是:
Message #1
Message #2
...
Message #100000
Query #1
Query #2
...
Query #100000
我正在处理mysql响应之前收到每条消息。所以我的问题是,为什么?为什么在所有消息事件完成之前我无法获得单个数据库结果?
另一个注意事项:我.emit('message')
在node-kafka和mysql.query()
代码中的处设置了一个断点,并且将它们设为回合制。因此看来,进入我的事件订阅者之前,所有100,000个发射都没有预先堆积。因此,我有了关于该问题的第一个假设。
想法和知识将不胜感激:)
该node-kafka
驱动程序使用相当大的缓冲区大小(1M),这意味着它将从Kafka接收尽可能多的消息,这些消息将适合缓冲区。如果服务器积压,并且取决于消息大小,这可能意味着一个请求传入了数千条消息。
因为EventEmitter是同步的(它不使用Node事件循环),所以这意味着驱动程序将向其侦听器发送(成千上万个)事件,并且由于它是同步的,因此直到产生事件之前,它都不会屈服于Node事件循环。所有邮件均已发送。
我认为您无法解决大量事件交付问题,但是我认为事件交付问题不大。更有可能的问题是为每个事件启动异步操作(在本例中为MySQL查询),这可能会使查询充满数据库。
可能的解决方法是使用队列,而不是直接从事件处理程序执行查询。例如,async.queue
您可以限制并发(异步)任务的数量。队列的“工作者”部分将执行MySQL查询,在事件处理程序中,您只需将消息推送到队列中即可。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句