MessageType:“ PublishX”
消费者:
Type1ConsumerX
Type2ConsumerX
Type3ConsumerX
所有使用者必须立即捕获消息,但必须在自己内部同步消费。
例如,队列中有100条“ PublishX”消息。Type1ConsumerX消耗30个消息(同步),Type2ConsumerX消耗50个消息(同步),Type3ConsumerX消耗100个消息(同步)。我怎么知道“所有类型的消费者”都消费了该消息?
RabbitMQ / MassTransit PUSH消息可以发送给消费者吗?
RabbitMQ / MassTransit是否可以按间隔(1s)推送消息(合并它们)以减少网络流量?
RabbitMQ / MassTransit可以将相同的消息发送给不同类型的消费者吗?
如果我正确理解了这个问题,则只需要设置一个基本的发布/订阅模式。这将使您可以将相同的消息传递给多个使用者。
示例发布者:
public static void PublishMessageToFanout()
{
var factory = new ConnectionFactory { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("messages", "fanout");
var message = new Message { Text = "This is a message to send" };
var json = JsonConvert.SerializeObject(message);
var body = Encoding.UTF8.GetBytes(json);
channel.BasicPublish("messages", string.Empty, null, body);
}
}
消费者示例:
SubscribeToMessages("sms-messages", (s) => Console.WriteLine("SMS Message: {0}", s));
SubscribeToMessages("email-messages", (s) => Console.WriteLine("Email Message: {0}", s));
public static void SubscribeToMessages(string queueName, Action<string> messageAction)
{
var factory = new ConnectionFactory() { HostName = "localhost" };
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
{
channel.ExchangeDeclare("messages", "fanout");
channel.QueueDeclare(queueName, true, false, false, null);
channel.QueueBind(queueName, "messages", string.Empty);
var consumer = new QueueingBasicConsumer(channel);
channel.BasicConsume(queueName, true, consumer);
while (true)
{
var ea = consumer.Queue.Dequeue();
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
messageAction(message);
}
}
}
如果您SubscribeToMessages
在单独的进程或控制台应用程序中运行循环,则每次调用时,它们都将打印出消息PublishMessageToFanout
。您还将在“队列”下的RabbitMQ管理中看到两个队列。
本文收集自互联网,转载请注明来源。
如有侵权,请联系[email protected] 删除。
我来说两句