我正在阅读 RabbitMq.Client 库的 .net 文档(https://www.rabbitmq.com/tutorials/tutorial-seven-dotnet)并到达这部分
var channel = connection.CreateModel();
channel.ConfirmSelect();
channel.BasicAcks += (sender, ea) =>
{
// code when message is confirmed
};
channel.BasicNacks += (sender, ea) =>
{
//code when message is nack-ed
};
引用: 从相应的回调中重新发布 nack-ed 消息可能很诱人,但应该避免这种情况,因为确认回调是在 I/O 线程中调度的,而通道不应该在该线程中执行操作。更好的解决方案是将消息放入内存队列中,由发布线程轮询。像
ConcurrentQueue
这样的类将是在确认回调和发布线程之间传输消息的良好候选者。
我读过有关操作系统级 IO 线程的内容,但我很确定这与它们无关。线程怎么可能是IO呢?为什么他们告诉我我不应该这样做
channel.BasicNacks += async (sender, ea) =>
{
var someIoReuslt = await channel.BasicPublishAsync(queue,...);
};
我尝试查看 Channel 的内部实现,它们如何调用在此事件上订阅的所有回调(收到 nack)
protected void HandleBasicAck(IncomingCommand cmd)
{
var ack = new BasicAck(cmd.MethodSpan);
if (!_basicAcksWrapper.IsEmpty)
{
var args = new BasicAckEventArgs(ack._deliveryTag, ack._multiple);
_basicAcksWrapper.Invoke(this, args);
}
HandleAckNack(ack._deliveryTag, ack._multiple, false);
}
除了返回类型 void 之外,我认为这里在性能方面没有问题!
rabbitmq中的I/O操作主要用于rabbitmq客户端和rabbit mq代理之间的通信。
回调(BasicAcks、BasicNacks)将使用 I/O 线程,因此您可能想知道为什么这会成为问题。 Rabbitmq 专为快速消息传输和非阻塞消息传输而设计。当你在回调中执行繁重的操作时,这些会发生在rabbitmq使用的I/O线程上,这会影响性能,甚至可能导致死锁(如果你在I/O线程中执行繁重的任务,通信可能会中断) 。因此,rabbitmq 教程建议使用像 ConcurrentQueue 这样的重新排队策略,因为它提供了线程之间的线程安全通信。它可以保留未确认的消息,直到它们重新发布
为了处理此重新发布,您可以创建一个队列服务并在该服务中使用,
微软官方文档:https://learn.microsoft.com/en-us/dotnet/core/extensions/queue-service
希望对您有帮助,