我目前正在尝试使用发布者确认来实施更具弹性的RabbitMQ。
我一直在使用他们教程中的代码,到目前为止,通过将NACK
设置为max-limit
,将5
设置为overflow
,我已经能够触发发布方的reject-publish
回调函数在队列属性中。
到目前为止,我有一个带有以下代码的发送方控制台应用程序,并且我正在使用RabbitMQ管理工具出队,等等。
namespace ConsoleSender
{
class Program
{
static void Main(string[] args)
{
var factory = new ConnectionFactory() { HostName = "xxxxxxxx" };
using(var connection = factory.CreateConnection())
using(var channel = connection.CreateModel())
{
// set RabbitMQ confirms
channel.ConfirmSelect();
// create concurrent dictionary
ConcurrentDictionary<ulong, string> outstandingConfirms = new ConcurrentDictionary<ulong, string>();
// basic NACK event
channel.BasicNacks += (sender, ea) =>
{
// get NACK'd message from concurrent dictionary
outstandingConfirms.TryGetValue(ea.DeliveryTag, out string body);
Console.WriteLine($"Message with body: {body} has been NACK'd, with sequence number: {ea.DeliveryTag}");
// send NACK'd messages to in memory queue
RepublishQueue.AddToNackQueue(ea.DeliveryTag, body);
// clean concurrent dictionary (needs to be in a method)
if (ea.Multiple)
{
var confirmed = outstandingConfirms.Where(k => k.Key <= ea.DeliveryTag);
foreach (var c in confirmed)
{
outstandingConfirms.TryRemove(c.Key, out _);
}
}
else
{
outstandingConfirms.TryRemove(ea.DeliveryTag, out _);
}
};
// declare dictionary for args to create a NACK
Dictionary<string, object> queueArgs = new Dictionary<string, object>()
{
{"x-max-length", 5},
{"x-overflow", "reject-publish"}
};
channel.QueueDeclare(queue: "testBackgroundService", durable: false, exclusive: false, autoDelete: false, arguments: queueArgs);
// publish 10 messages
for (int i = 0; i < 10; i++)
{
string message = $"Hello World! + {i + 1}";
// put each message in concurrent dictionary
outstandingConfirms.TryAdd(channel.NextPublishSeqNo, message);
channel.BasicPublish(exchange: "", routingKey: "testBackgroundService", basicProperties: null, body: Encoding.UTF8.GetBytes(message));
Console.WriteLine(" [x] Sent {0}", message);
}
}
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
}
}
}
结果符合预期,其中从6
到10
的消息为NACK-ed
并打印到控制台。
阅读完本教程的最后一部分:
可能很诱人地重新发布来自相应的回调,但应避免这种情况,如确认回调在没有通道的I / O线程中调度应该做手术。更好的解决方案是将内存队列中的消息,该消息由发布线程轮询。像
ConcurrentQueue
这样的课程很适合发送确认回调和发布线程之间的消息。
我决定用另一个并发字典创建一个静态类,以便存储NACK-ed
消息:
RepublishQueue.AddToNackQueue(ea.DeliveryTag, body);
在NACK
事件中,但是我不确定接下来要走的方向,因为我不完全了解解释发布线程轮询的部分。这是否意味着在发布之前放置if
语句来检查字典中的项目数,还是其他?
您是否获得了重新发布短消息的解决方案?如果您有解决方案,请与我们分享