我想设置一个超时时间,超过该超时时间后,出列的消息将被自动 NACK。
当我将消息出队时,我会等待消息通过套接字传输并且另一方确认其接收。
我需要保留计时器列表还是 RMQ 可以自动处理此问题吗?
private void Run()
{
_rmqConnection = _queueConnectionFactory.CreateFactory().CreateConnection();
_rmqReadchannel = _rmqConnection.CreateModel();
_rmqReadchannel.QueueDeclare(QueueIdOutgoing(), true, false, false, null);
_rmqReadchannel.BasicQos(0, 1, false);
var consumer = new QueueingBasicConsumer(_rmqReadchannel);
_rmqReadchannel.BasicConsume(QueueIdOutgoing(), false, consumer);
while (true)
{
if (!_rmqReadchannel.IsOpen)
{
throw new Exception("Channel is closed");
}
var ea = consumer.Queue.Dequeue();
string jsonData = Encoding.UTF8.GetString(ea.Body);
if (OnOutgoingMessageReady != null)
{
OnOutgoingMessageReady(this, new QueueDataEventArgs(jsonData, ea.DeliveryTag));
}
//waiting for ACK from a different thread
}
}
是的。这在官方Python教程中讨论过:
消费者交付确认强制超时(默认 30 分钟)。这有助于检测从不确认交货的有问题(卡住)的消费者。
您可以在 RabbitMQ 文档中找到有关 传递确认超时
的更多信息然而,情况并非总是如此。旧版本的 RabbitMQ(至少到版本 3.6.x)没有提供任何类型的超时机制来确认消息。这在旧版本的Python官方教程中提到过:
没有任何消息超时;仅当工作连接断开时,RabbitMQ 才会重新发送消息。即使处理一条消息需要非常非常长的时间也没关系。
AMQP 0-9-1 规范第 3.1.8 节描述了确认,并且非常清楚它们可以是自动(客户端不必执行任何操作,消息一经确认就会被确认)已交付)或显式(客户端必须对其已处理的每条消息或一组消息进行 Ack)。
以下是 2009 年的一些过去的讨论,证实了这种行为。
我看到的第一个关于更改此行为的参考是 2019 年 4 月的 this PR。我不确定该更改包含在哪个版本的服务器中,但听起来默认值最初是“无超时”,然后在 RabbitMQ 3.8.15 中为 15 分钟,然后在 RabbitMQ 3.8.17 中为 30 分钟(截至 2021 年 10 月仍保持不变)。
所以:此行为取决于您的 RabbitMQ 版本。旧版本要求您在一段时间后显式发送 NACK。较新的版本有默认超时。
RabbitMQ 的现代版本有 ack 超时。 因此,如果您的消费者在确认交货之前花费了大量时间,请谨慎更新新版本。
如果消费者在超过超时值(默认为 30 分钟)后未确认其交付,则其通道将被关闭,并出现 PRECONDITION_FAILED 通道异常。
更新: 更新的文档包含禁用超时的说明:
可以使用advanced.config 禁用超时。不建议这样做:
%% advanced.config
[
{rabbit, [
{consumer_timeout, undefined}
]}
].
不要完全禁用超时,而是考虑使用较高的值(例如几个小时)。