使工作队列和RPC协同工作

马特·伯兰德

我开始使用RabbitMQ,并在按照教程进行操作之后,现在尝试使它按我需要的方式工作,并且遇到了麻烦。我的设置是,我需要首先能够进行RPC,然后基于该响应,客户端将(或将不会)将另一条消息发送到工作队列(在此我不需要响应)客户端)。不幸的是,我为此付出的努力似乎并没有达到我想要的方式。在服务器端,我有类似的内容(我尝试了许多变体,但都遇到了相同的问题):

var factory = new ConnectionFactory() { HostName = "localhost" };
connection = factory.CreateConnection();
channel = connection.CreateModel();
channel.ExchangeDeclare(exchange: "jobs", type: "direct", durable: true);

// I started with a named queue, not sure if that's better or worse for this
var queueName = channel.QueueDeclare().QueueName;

channel.QueueBind(queue: queueName, 
    exchange: "jobs",
    routingKey: "saveJob_queue");

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    // save stuff that was sent with the saveJob_queue routingKey
}

channel.BasicConsume(queue: queueName, 
    noAck: false,
    consumer: consumer);

// set up channel for RPC
// Not sure if this has to have another channel, but it wasn't working on the same channel either
rpcChannel = connection.CreateModel();
var rpcQueueName = rpcChannel.QueueDeclare().QueueName;

rpcChannel.QueueBind(queue: rpcQueueName,  
    exchange: "jobs",
    routingKey: "rpc_CheckJob_queue");

var rpcConsumer = new EventingBasicConsumer(rpcChannel);

rpcConsumer.Received += (model, ea) =>
{
    // do my remote call and send back a response
}

我遇到的问题是,尽管路由消息只应接收路由,但jobs使用路由键发送给交换机的消息rpc_CheckJob_queue仍然最终触发Recieved了第一个通道上的事件saveJob_queue我可以签ea.RoutingKey入该处理程序,而忽略这些消息,但是我不明白它们为什么以及为什么首先出现在那儿?

设置连接以便它既可以接收工作队列消息又可以接收RPC消息并正确处理它们的正确方法是什么?

马特·伯兰德

所以我放弃了这一点,决定只过滤Received事件。我认为问题在于RabbitMQ仅Received通道有一个事件,而在队列上却没有因此,该Received事件会以任何一种方式发生。所以现在我有了这个:

channel.QueueDeclare(queue: queueName,  
         durable: true,
         exclusive: false,
         autoDelete: false,
         arguments: null);

channel.QueueDeclare(queue: rpcQueueName,
         durable: false,
         exclusive: false,
         autoDelete: false,
         arguments: null);

channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    switch (ea.RoutingKey)
    {
        case queueName:
            SaveJob(ea);
            break;
        case rpcQueueName:
            CheckJob(ea);
            break;
    }
    channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
};

channel.BasicConsume(queue: queueName, 
    noAck: false,
    consumer: consumer);

channel.BasicConsume(queue: rpcQueueName,
                     noAck: false,
                     consumer: consumer);

我愿意接受更好的建议,因为这似乎有些偏离。

因此发送只是:

var properties = channel.CreateBasicProperties();
properties.Persistent = true;

channel.BasicPublish(exchange: "",
                     routingKey: queueName,
                     basicProperties: properties,
                     body: body);

从事正常工作,并:

var corrId = Guid.NewGuid().ToString();
var props = channel.CreateBasicProperties();
props.ReplyTo = replyQueueName;
props.CorrelationId = corrId;

var messageBytes = Encoding.UTF8.GetBytes(msg);
channel.BasicPublish(exchange: "",
                     routingKey: rpcQueueName,
                     basicProperties: props,
                     body: messageBytes);

while (true)   
{
    var ea = (BasicDeliverEventArgs)consumer.Queue.Dequeue();
    if (ea.BasicProperties.CorrelationId == corrId)
    {
        return ea.Body != null && ea.Body.Any() ? BitConverter.ToInt32(ea.Body,0) : (int?)null;
    }
}

对于RPC。

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章

来自分类Dev

如何使SSRS和Chrome协同工作?

来自分类Dev

使Spark,Python和MongoDB协同工作

来自分类Dev

使Viewbox和ScrollViewer协同工作

来自分类Dev

使Viewbox和ScrollViewer协同工作

来自分类Dev

函数和列表如何协同工作?

来自分类Dev

Angular 4 和 Umbraco 协同工作

来自分类Dev

JavaScript工作队列

来自分类Dev

我如何协同工作?

来自分类Dev

KTA工作队列查询

来自分类Dev

使JSON.NET和Serializable属性协同工作

来自分类Dev

如何使flexbox,transitions和IE10协同工作?

来自分类Dev

无法使就绪和调整大小的功能协同工作

来自分类Dev

如何使大礼包,滑轨和liqpay协同工作?

来自分类Dev

std :: transform和std :: plus如何协同工作?

来自分类Dev

获取点鼠标交互和笔刷协同工作

来自分类Dev

Ember.js-使路线和模型协同工作

来自分类Dev

autofs和WebDAV-如何使它们协同工作?

来自分类Dev

使JSON.NET和Serializable属性协同工作

来自分类Dev

如何使这种稀疏矩阵和Trie协同工作

来自分类Dev

无法使静态路由和NAT协同工作

来自分类Dev

无法使就绪和调整大小的功能协同工作

来自分类Dev

获取点鼠标交互和笔刷协同工作

来自分类Dev

.match 和捕获组如何协同工作?

来自分类Dev

ChrootDirectory 和用户的主目录如何协同工作?

来自分类Dev

Context Locals 和 The Request Context 如何协同工作?

来自分类Dev

dateadd、datetime 和 datediff 如何协同工作?

来自分类Dev

让 Inertia.js 和 Xampp 协同工作

来自分类Dev

C# 接口和泛型协同工作

来自分类Dev

如何使我的方法协同工作