如何利用与.NET的RabbitMQ客户消费的预取数

问题描述 投票:0回答:2

其状态的RabbitMQ的文档中的以下

“作为一个经验法则,共享线程之间的通道情况下是要避免的东西。应用程序应该选用每线程通道,而不是在多个线程共享相同的通道。”

目前,我们正在寻找在它已建议,如果你有一个小数量的消费者和AUTOACK =假的,那么我们就应该立刻消耗大量信息的预取计数。然而,我们发现,预取还没有效果,如果消费者将手动确认回用一个单独的执行线程。但是,如果我们在一个任务包的消费者处理,我们发现预取计数事不和显着地改善消费的表现。

请看下面的例子,其中我们包装通过在任务对象消费者消息的消耗:

class Program
{
    public static void Main()
    {
        var factory = new ConnectionFactory()
        {
            HostName = "172.20.20.13",
            UserName = "billy",
            Password = "guest",
            Port = 5671,
            VirtualHost = "/",
            Ssl = new SslOption
            {
                Enabled = true,
                ServerName = "rabbit.blah.com",
                Version = System.Security.Authentication.SslProtocols.Tls12
            }
        };
        var connection = factory.CreateConnection();
        var channel = connection.CreateModel();
        channel.BasicQos(0, 100, false);
        channel.ExchangeDeclare(exchange: "logs", type: "fanout");
        var queueName = channel.QueueDeclare().QueueName;
        Console.WriteLine(" [*] Waiting for logs.");

        var consumer = new EventingBasicConsumer(channel);
        consumer.Received += (model, ea) =>
        {
            var _result = new Task(() => {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body);
                System.Threading.Thread.Sleep(80);

                channel.BasicAck(ea.DeliveryTag, false);
            });
            _result.Start();
        };
        channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    }
}

我的问题是怎么做的人实现消费者与.NET的RabbitMQ客户端预取计数是无济于事?你一定要使用某种形式的任务手动ACK?是它的安全

c# rabbitmq messaging
2个回答
2
投票

你提到的文件是Java客户端。你应该是指而不是this document

您正在使用最新版本的.NET客户端(5.1),所以做你的Received事件处理的工作将不会阻止使用TCP数据处理其他线程,它不会阻止心跳 - 这两者都是很好的。

首先,调用channel.BasicQos(0, 1, false)意味着你的消费者只会在从RabbitMQ的同时接收一个就绪的消息,直到BasicAck被称为另一个消息不会被传输。所以,真的没有理由这样做在另一个线程你的工作,因为你不会得到另一个消息呢。

如果增加预取值(通过试验和运行基准),你将不得不这样做在后台线程你的工作,如果你的工作时间超过几毫秒的时间来运行。

当你做你的Received事件回调的工作,这将阻止被用于制作回调,因为在自己的线程不执行回调的线程。所以,你可以确保你的工作是很短的,或者做的工作在另一个线程。

我刚刚花了一些时间来浏览.NET客户端代码,我敢肯定的IModel实例不是线程安全的。如果增加预取,您将有机会在同一时间承认几条消息,所以我建议实现使用这一点,并确保BasicAck被称为在其创建的连接在同一个线程的解决方案。


注:RabbitMQ的团队监控rabbitmq-users mailing list和只是有时在计算器上回答问题。


1
投票

来源:https://www.rabbitmq.com/api-guide.html

当使用手动确认时,必须考虑什么线程中的确认是很重要的。如果它是从接收到递送线程不同的(例如消费者#handleDelivery委派输送处理,以不同的线程),设置为true所述多个参数确认为不安全的,并且将导致双确认,并且因此信道级协议异常封闭的通道。同时确认一个消息可以是安全的。

channel.basicAck(tag, false)是线程安全的

consumerChannel.basicAck(tag, true)不是。

此外,在RabbitMQ and channels Java thread safety提到了一些好点

© www.soinside.com 2019 - 2024. All rights reserved.