批量使用消息-RabbitMQ

戈皮

使用上面的代码,我能够使用不同的路由键来使用多个生产者发送到同一交换机的多个消息,并且能够将每个消息插入数据库。

但这会消耗过多的资源,因为消息将一个接一个地插入到数据库中。所以我决定去批量插入,发现我可以设置BasicQos

在BasicQos中将消息限制设置为10后,我的期望是Console.WriteLine必须写入10条消息,但是它没有达到预期。

我的期望是从队列中消耗N个消息,然后进行大容量插入,并在成功发送ACK的情况下进行,否则将不进行ACK

这是我使用的代码。

using (var connection = factory.CreateConnection())
{
    using (var channel = connection.CreateModel())
    {
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_A");
        channel.QueueBind(queue: "queueName", exchange: "exchangeName", routingKey: "Producer_B");

        channel.BasicQos(0, 10, false);

        var consumer = new EventingBasicConsumer(channel);
        channel.BasicConsume(queue: "queueName", noAck: false, consumer: consumer);

        consumer.Received += (model, ea) =>
        {
            try
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);

                // Insert into Database

                channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
                Console.WriteLine(" Recevier Ack  " + ea.DeliveryTag);
            }
            catch (Exception e)
            {
                channel.BasicNack(deliveryTag: ea.DeliveryTag, multiple: false, requeue: true);
                Console.WriteLine(" Recevier No Ack  " + ea.DeliveryTag);
            }
        };

        Console.ReadLine();
    }
}
加布里埃莱·圣托马乔(Gabriele Santomaggio)

BasicQos = 10表示客户端一次只能提取10条消息,但是当您使用它时,您每次只会看到一条消息。在这里阅读:https : //www.rabbitmq.com/consumer-prefetch.html

AMQP指定basic.qos方法,以允许您在使用(即“预取计数”)时限制通道(或连接)上未确认消息的数量。

对于您的作用域,您必须下载消息,将其放入临时列表中,然后插入数据库中。

然后您可以使用:

channel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: true);

无效basicAck()

参数:deliveryTag-接收到的AMQP.Basic.GetOk或AMQP.Basic.Deliver的标签

多个-表示确认所有消息,直到并包括提供的交付标签;为true;如果仅确认提供的交付标签,则为false。

例子

final List<String> myMessagges = new ArrayList<String>();
        channel.basicConsume("my_queue", false, new DefaultConsumer(channel) {

            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException {
                myMessagges.add(new String(body));
                System.out.println("Received...");

                if (myMessagges.size() >= 10) {
                    System.out.println("insert into DB...");
                    channel.basicAck(envelope.getDeliveryTag(), true);
                    myMessagges.clear();
                }


            }
        });

本文收集自互联网,转载请注明来源。

如有侵权,请联系[email protected] 删除。

编辑于
0

我来说两句

0条评论
登录后参与评论

相关文章