重新发布NACK版本的消息-RabbitMQ .NET Core 3.0

问题描述 投票:0回答:1

我目前正在尝试使用发布者确认来实施更具弹性的RabbitMQ。

我一直在使用他们教程中的代码,到目前为止,通过将NACK设置为max-limit,将5设置为overflow,我已经能够触发发布方的reject-publish回调函数在队列属性中。

到目前为止,我有一个带有以下代码的发送方控制台应用程序,并且我正在使用RabbitMQ管理工具出队,等等。

namespace ConsoleSender
{
    class Program
    {
        static void Main(string[] args)
        {
            var factory = new ConnectionFactory() { HostName = "xxxxxxxx" };
            using(var connection = factory.CreateConnection())
            using(var channel = connection.CreateModel())
            {

                // set RabbitMQ confirms
                channel.ConfirmSelect();

                // create concurrent dictionary
                ConcurrentDictionary<ulong, string> outstandingConfirms = new ConcurrentDictionary<ulong, string>();

                // basic NACK event
                channel.BasicNacks += (sender, ea) =>
                {
                    // get NACK'd message from concurrent dictionary
                    outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
                    Console.WriteLine($"Message with body: {body} has been NACK'd, with sequence number: {ea.DeliveryTag}");

                    // send NACK'd messages to in memory queue
                    RepublishQueue.AddToNackQueue(ea.DeliveryTag, body);

                    // clean concurrent dictionary (needs to be in a method)
                    if (ea.Multiple)
                    {
                        var confirmed = outstandingConfirms.Where(k => k.Key <= ea.DeliveryTag);

                        foreach (var c in confirmed)
                        {
                            outstandingConfirms.TryRemove(c.Key, out _);
                        }
                    }
                    else
                    {
                        outstandingConfirms.TryRemove(ea.DeliveryTag, out _);
                    }
                };

                // declare dictionary for args to create a NACK
                Dictionary<string, object> queueArgs = new Dictionary<string, object>()
                {
                    {"x-max-length", 5},
                    {"x-overflow", "reject-publish"}
                };

                channel.QueueDeclare(queue: "testBackgroundService", durable: false, exclusive: false, autoDelete: false, arguments: queueArgs);

                // publish 10 messages
                for (int i = 0; i < 10; i++)
                {
                    string message = $"Hello World! + {i + 1}";

                    // put each message in concurrent dictionary
                    outstandingConfirms.TryAdd(channel.NextPublishSeqNo, message);

                    channel.BasicPublish(exchange: "", routingKey: "testBackgroundService", basicProperties: null, body: Encoding.UTF8.GetBytes(message));
                    Console.WriteLine(" [x] Sent {0}", message);
                }
            }

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }
}

结果符合预期,其中从610的消息为NACK-ed并打印到控制台。

阅读完本教程的最后一部分:

可能很诱人地重新发布来自相应的回调,但应避免这种情况,如确认回调在没有通道的I / O线程中调度应该做手术。更好的解决方案是将内存队列中的消息,该消息由发布线程轮询。像ConcurrentQueue这样的课程很适合发送确认回调和发布线程之间的消息。

我决定用另一个并发字典创建一个静态类,以便存储NACK-ed消息:

RepublishQueue.AddToNackQueue(ea.DeliveryTag, body);NACK事件中,但是我不确定接下来要走的方向,因为我不完全了解解释发布线程轮询的部分。这是否意味着在发布之前放置if语句来检查字典中的项目数,还是其他?

rabbitmq
1个回答
0
投票

您是否获得了重新发布短消息的解决方案?如果您有解决方案,请与我们分享

© www.soinside.com 2019 - 2024. All rights reserved.