我正在评估RabbitMQ,虽然总体印象(本身就是AMQP,还有RabbitMQ)是积极的,但是我对结果并不印象深刻。
我试图同时发布和使用消息,但获得了非常差的消息率。我有一个持久的直接交换,它绑定到一个持久队列,并向该交换发布持久消息。邮件正文的平均大小约为1000个字节。
我的发布大致如下:
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("my-host");
factory.setPort(5672);
Connection conn = null;
Channel channel = null;
ObjectMapper mapper = new ObjectMapper(); //com.fasterxml.jackson.databind.ObjectMapper
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.confirmSelect();
} catch (IOException e) {}
for(Message m : messageList) { //the size of messageList happens to be 9945
try {
channel.basicPublish("exchange", "", bldr.deliveryMode(2).contentType("application/json").build(), mapper.writeValueAsBytes(cm));
} catch (Exception e) {}
}
try {
channel.waitForConfirms();
channel.close();
conn.close();
} catch (Exception e1) {}
从绑定队列中消费消息的过程如下:
AMQP.BasicProperties.Builder bldr = new AMQP.BasicProperties.Builder();
ConnectionFactory factory = new ConnectionFactory();
factory.setUsername("guest");
factory.setPassword("guest");
factory.setVirtualHost("/");
factory.setHost("my-host");
factory.setPort(5672);
Connection conn = null;
Channel channel = null;
try {
conn = factory.newConnection();
channel = conn.createChannel();
channel.basicQos(100);
while (true) {
GetResponse r = channel.basicGet("rawDataQueue", false);
if(r!=null)
channel.basicAck(r.getEnvelope().getDeliveryTag(), false);
}
} catch (IOException e) {}
问题是,当消息发布者(或其中几个)和使用者(或其中几个)同时运行时,发布者似乎正在全速运行,并且RabbitMQ管理Web界面显示的发布速度为每秒约2 ... 3K消息,但每个使用者的消耗率为0.5 ... 3。当发布者完成后,我的消费率是每个消费者300 ... 600条消息。如果未为Java客户端设置QOS预取值,则将其设置为100或250,然后将其设置为稍低一些。
当尝试用某种方式限制消费者时,我设法实现了同时发消息的数量,例如每秒发布约400条消息和每秒消耗50条消息,这虽然稍好一些,但仅略有增加。
这是RabbitMQ博客条目的引文,它声称队列在空时最快,这很可能是最快的,但是当队列中有成千上万条持久消息时,使消耗速度减慢以进行爬网仍然是不可接受的。
较高的QOS预取值可能会有所帮助,但是恕我直言,这并不是解决方案。
要实现合理的吞吐率(在任何情况下每位消费者每秒消耗2条消息都是不合理的),可以采取什么措施?这只是一个简单的直接交换-一个绑定-一个队列的情况,我是否应该期望在更复杂的配置下性能进一步下降?在互联网上搜索时,也有降低耐用性的建议,但就我而言,这恐怕不是一种选择。如果有人指出我很愚蠢,并且有某种明显而直接的解决方案,我将非常高兴:)
您是否尝试过使用autoAck选项?那应该改善您的表现。这比获得消息并确认消息要快得多。增加预取次数也应该使其更好。
另外,您发送和使用的消息(包括标题)的大小是多少?您是否正在经纪人中进行任何流量控制?
另一个问题,您是否在每次发送/接收消息时都建立连接和通道?如果是这样,那是错误的。您应该只创建一次连接,并且每个线程使用一个通道(可能以线程本地的方式)来发送和接收消息。每个连接可以有多个通道。没有关于此的官方文档,但是,如果您阅读文章和论坛,这似乎是最佳的性能实践。
最后一件事,您是否考虑过使用basicConsume
代替basicGet
?它还应使其更快。
根据我的经验,我能够运行一个集群,以每秒20000条消息的速度发送和使用非持久消息。我猜想,如果您使用持久性消息和持久性消息,性能可能会降低一点,但不会降低10倍。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句