考虑这个rabbitmq回复队列机制,其中仅检索1条消息,发送回客户端,然后必须删除生成的(自动删除)队列。
string startResponseConsumer(IModel? channel)
{
string replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
};
channel.BasicConsume(replyQueueName, true, consumer);
return replyQueueName;
}
它有效,消息响应被发送回客户端,但队列没有被删除。 10 次调用后,这是控制台中显示的内容。
因此每次调用此方法时,都会添加一个队列。我猜当通道被处置时,它们只会在退出时被删除。
但是这是一项服务,必须一直在线,不能永远积累队列。所以我在接收到的事件中添加了channel.Close()。
string startResponseConsumer(IModel? channel)
{
string replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (!callbackMapper.TryRemove(ea.BasicProperties.CorrelationId, out var tcs))
return;
var body = ea.Body.ToArray();
var response = Encoding.UTF8.GetString(body);
tcs.TrySetResult(response);
// added this to have the queue removed
if (channel != default)
{
channel.Close();
channel = default;
}
};
channel.BasicConsume(replyQueueName, true, consumer);
return replyQueueName;
}
这不是线程安全的,为每个线程创建的通道在处理所有消息之前都会关闭。
有没有正确的方法可以在收到消息后删除自动删除队列?
这有效:
当一切正常并且回复消息收到 ACK 时,消费者关闭其通道。
string replyQueueName = channel.QueueDeclare().QueueName;
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
if (channel != default)
{
channel.Close();
channel = default;
}
};
channel.BasicConsume(replyQueueName, true, consumer);
但是,当由于任何原因(例如服务中断)未发送回复消息时,计时器将启动,导致通道关闭延迟执行 30 秒
DelayedExecution 仅使用 System.Threading.Timer。
DelayedExecution.Go(() => {
if (channel != null && channel.IsOpen) { channel.Close(); }
}, 30000);