其状态的RabbitMQ的文档中的以下
“作为一个经验法则,共享线程之间的通道情况下是要避免的东西。应用程序应该选用每线程通道,而不是在多个线程共享相同的通道。”
目前,我们正在寻找在它已建议,如果你有一个小数量的消费者和AUTOACK =假的,那么我们就应该立刻消耗大量信息的预取计数。然而,我们发现,预取还没有效果,如果消费者将手动确认回用一个单独的执行线程。但是,如果我们在一个任务包的消费者处理,我们发现预取计数事不和显着地改善消费的表现。
请看下面的例子,其中我们包装通过在任务对象消费者消息的消耗:
class Program
{
public static void Main()
{
var factory = new ConnectionFactory()
{
HostName = "172.20.20.13",
UserName = "billy",
Password = "guest",
Port = 5671,
VirtualHost = "/",
Ssl = new SslOption
{
Enabled = true,
ServerName = "rabbit.blah.com",
Version = System.Security.Authentication.SslProtocols.Tls12
}
};
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
channel.BasicQos(0, 100, false);
channel.ExchangeDeclare(exchange: "logs", type: "fanout");
var queueName = channel.QueueDeclare().QueueName;
Console.WriteLine(" [*] Waiting for logs.");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var _result = new Task(() => {
var body = ea.Body;
var message = Encoding.UTF8.GetString(body);
System.Threading.Thread.Sleep(80);
channel.BasicAck(ea.DeliveryTag, false);
});
_result.Start();
};
channel.BasicConsume(queue: "test.queue.1", autoAck: false, consumer: consumer);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
我的问题是怎么做的人实现消费者与.NET的RabbitMQ客户端预取计数是无济于事?你一定要使用某种形式的任务手动ACK?是它的安全
你提到的文件是Java客户端。你应该是指而不是this document。
您正在使用最新版本的.NET客户端(5.1
),所以做你的Received
事件处理的工作将不会阻止使用TCP数据处理其他线程,它不会阻止心跳 - 这两者都是很好的。
首先,调用channel.BasicQos(0, 1, false)
意味着你的消费者只会在从RabbitMQ的同时接收一个就绪的消息,直到BasicAck
被称为另一个消息不会被传输。所以,真的没有理由这样做在另一个线程你的工作,因为你不会得到另一个消息呢。
如果增加预取值(通过试验和运行基准),你将不得不这样做在后台线程你的工作,如果你的工作时间超过几毫秒的时间来运行。
当你做你的Received
事件回调的工作,这将阻止被用于制作回调,因为在自己的线程不执行回调的线程。所以,你可以确保你的工作是很短的,或者做的工作在另一个线程。
我刚刚花了一些时间来浏览.NET客户端代码,我敢肯定的IModel
实例不是线程安全的。如果增加预取,您将有机会在同一时间承认几条消息,所以我建议实现使用这一点,并确保BasicAck
被称为在其创建的连接在同一个线程的解决方案。
注:RabbitMQ的团队监控rabbitmq-users
mailing list和只是有时在计算器上回答问题。
来源:https://www.rabbitmq.com/api-guide.html
当使用手动确认时,必须考虑什么线程中的确认是很重要的。如果它是从接收到递送线程不同的(例如消费者#handleDelivery委派输送处理,以不同的线程),设置为true所述多个参数确认为不安全的,并且将导致双确认,并且因此信道级协议异常封闭的通道。同时确认一个消息可以是安全的。
channel.basicAck(tag, false)
是线程安全的
但consumerChannel.basicAck(tag, true)
不是。
此外,在RabbitMQ and channels Java thread safety提到了一些好点