请指导我如何由消费者重新排队和重新处理消息。我的目标是,如果消息中存在错误,则重新排队消息,然后消费者将尝试再次重新消费。让我们假设,如果我将消息发送到数据库,并将返回错误。在这种情况下,我可以发送消息以进行重新排队和重新处理。
如果我拒绝该消息,它会继续处理相同的内容并且不会转到下一条消息。就我而言,我拒绝 4,但它不会继续处理 5。
1 2 3 4 4 4 4
consumer.Received += (发送者, args) => {
//Task.Delay(TimeSpan.FromSeconds(1)).Wait();
var body = args.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
//send message to database at this level
Console.WriteLine($"Message received by receiver1: {message}");
if (Convert.ToInt32(message) == 4)
{
channel.BasicReject(args.DeliveryTag,true);
}
else
{
channel.BasicAck(args.DeliveryTag, false);
}
};
您在 basicReject 中发送 requeue=true 且不限制重试次数,因此消息将不间断地重新排队。
如果您使用仲裁队列,您可以通过向队列添加
x-delivery-limit
参数来限制重试次数,如下所示
var queueArgs = new Dictionary<string, object>();
queueArgs.Add("x-delivery-limit", QUEUE_DELIVERY_COUNT);
_channel.QueueDeclare(queueName, durable, false, false, queueArgs);
如果您使用经典队列或者想要实现包括死信在内的完整重试机制,您可以阅读以下文章:RabbitMQ Retries