我是 RabbitMQ 的新手,并且开始了一个以相当老式的“RPC”模式使用 RabbitMQ 的项目。所以我在“服务器”端尝试这样的事情:
ConnectionFactory factory = new ConnectionFactory();
factory.setUri(uri);
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();
channel.queueDeclare(queueName, false, false, false, null);
while (!shutdown) {
GetResponse gr = channel.basicGet(queueName, true);
... build reply ...
channel.basicPublish("", gr.getProps().getReplyTo(), replyProps, response);
}
我的问题是:在 basicGet() 上等待的线程可以被中断吗?如果是这样,会发生什么(未声明 InterruptedException)。它意识到这不是一个很好的模式,但我只是想要一些方法来干净地关闭服务。
更新:一条评论表明 basicGet() 根本不会阻塞,如果队列为空,则立即返回。如果是这种情况,让我更精确地修改我的问题:如何在超时的情况下等待队列中的消息并检索它?
UPDATE2:在rabbitmq邮件列表上进行试验和提问后,我得出结论,这不能直接完成。这根本不是你在 RabbitMQ 中做事的方式。相反,您使用 Channel.basicConsume() 启动消费者线程池并等待您的处理程序方法被调用。它可以通过让您的消费者发布到 SynchronizedQueue 或类似的东西并让您的前台线程等待它来间接完成,但请注意,这会破坏 basicConsume() 提供的自动缩放,并且也使其更难正确确认所有请求,并且还会创建额外的消息缓冲,这使得难以遵守由 basicQos() 调用设置的 QOS 语义。
还应该注意的是,一旦你沿着 basicConsume() 路线走下去,消费者可能会被中断。这是这样做的:
// This starts a background thread pool
String consumerTag = channel.basicConsume(consumer);
...
// Shutdown the consumer thread pool
channel.basicCancel(consumerTag);
UPDATE3:见最后一个答案。RabbitMQ 附带了一个运行良好的 RpcClient 类。
basicGet
不会阻塞,它会立即返回(好吧,就在网络往返之后),如果队列中没有消息,则返回 null。所以没有必要中断线程。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句