我正在使用带有EasyNetQ库的C#中的RabbitMQ。我在这里使用了pub / sub模式。我还有一些问题希望有人可以帮助我:
任何人?
EasyNetQ / RabbitMQ遇到的问题是,与其他消息服务(如SQS或Azure Service Bus / Queues)相比,它更加“原始”,但我会尽力指出正确的方向。
问题1。
这将是你要做的。最简单的方法是你可以在RabbitMQ / EasyNetQ中发送No-Ack消息,它将被放置在队列的头部,供你重试。这不是真的可取,因为它几乎会立即重试(没有时间延迟),并且还会阻止其他消息被处理(如果您有一个预取计数为1的单个订户)。
我见过使用“MessageEnvelope”的其他实现。这是一个包装类,当消息失败时,您在MessageEnvelope上递增重试变量,并将消息重新发送回队列。你必须这样做,并在你的消息处理程序周围编写包装代码,它不是EasyNetQ的功能。
使用上面的内容,我也看到人们使用信封,但允许信息被删除。一旦它出现在死信队列中,就会有另一个应用程序/工作人员从死信队列中读取项目。
上述所有这些方法都有一个小问题,因为在处理消息时没有任何好的方法可以具有对数/指数/任何类型的增加延迟。在将消息返回队列之前,您可以在代码中“保留”一段时间,但这并不是一个好方法。
在所有这些选项中,您自己的自定义应用程序读取死信队列并决定是否根据包含重试计数的信封重新路由邮件可能是最好的方法。
问题2。
您可以使用高级API为每个队列指定死信交换。 (https://github.com/EasyNetQ/EasyNetQ/wiki/The-Advanced-API#declaring-queues)。但是,这意味着您必须在任何地方使用高级API,因为使用subscribe / publish的简单IBus实现会查找基于消息类型和订户名称命名的队列。使用队列的自定义声明意味着您将自己处理队列的命名,这意味着当您订阅时,您将需要知道您想要的名称等。不再为您自动订阅!
问题3
错误队列/死信队列只是另一个队列。您可以收听此队列并执行您需要执行的操作。但实际上没有任何开箱即用的解决方案听起来像是符合您的需求。
我已经完全按照你的描述实现了。以下是一些基于我的经验并与您的每个问题相关的提示。
Q1(如何重试X次):
为此,您可以使用IMessage.Body.BasicProperties.Headers
。当您从错误队列中使用消息时,只需添加一个包含您选择的名称的标头。在每个进入错误队列的消息中查找此标头并将其递增。这将为您提供正在运行的重试计数。
当消息超过X的重试限制时,您有一个策略来执行该操作非常重要。您不希望丢失该消息。就我而言,我在那时将消息写入磁盘。它为您提供了许多有用的调试信息,以便稍后再回来,因为EasyNetQ会自动将您的原始消息包含在错误信息中。它还具有原始消息,以便您可以(如果您愿意)手动(或可能通过某些批处理重新处理代码自动化)稍后以某种受控方式重新排列消息。
您可以查看Hosepipe实用程序中的代码,以查看执行此操作的好方法。事实上,如果你按照你在那里看到的模式,你甚至可以在以后使用Hosepipe来重新排列消息,如果你需要的话。
Q2(如何为每个始发队列创建一个错误队列):
您可以使用EasyNetQ Advanced Bus干净利落地完成此操作。使用IBus.Advanced.Container.Resolve<IConventions>
进入约定界面。然后,您可以使用conventions.ErrorExchangeNamingConvention
和conventions.ErrorQueueNamingConvention
设置错误队列命名的约定。在我的例子中,我将约定设置为基于原始队列的名称,以便每次创建队列时都得到一个队列/ queue_error队列。
Q3(如何处理错误队列中的消息):
您可以像执行任何其他队列一样为错误队列声明使用者。同样,AdvancedBus允许您通过指定队列中的类型是EasyNetQ.SystemMessage.Error
来干净地执行此操作。所以,IAdvancedBus.Consume<EasyNetQ.SystemMessage.Error>()
会帮助你。重试只是意味着重新发布到原始交换(注意您在标题中放置的重试计数(请参阅上面我对Q1的回答),并且您从错误队列中消耗的错误消息中的信息可以帮助您找到目标重新发布。
我知道这是一个旧帖子,但是 - 以防它帮助其他人 - 这里是my self-answered question(我需要问它因为现有的帮助还不够),这解释了我如何在原始队列上实现重试失败的消息。以下内容应该回答您的问题#1和#3。对于#2,您可能必须使用我尚未使用的Advanced API(我认为它违背了EasyNetQ的目的;也可以直接使用RabbitMQ客户端)。不过也考虑实现IConsumerErrorStrategy。
1)由于消息可能有多个消费者,并且所有消息都可能不需要重试消息,因此我在消息正文中有一个Dictionary<consumerId, RetryInfo>
,因为EasyNetQ没有(开箱即用)支持消息头中的复杂类型。
public interface IMessageType
{
int MsgTypeId { get; }
Dictionary<string, TryInfo> MsgTryInfo {get; set;}
}
2)我已经实现了class RetryEnabledErrorMessageSerializer : IErrorMessageSerializer
,它只是在每次框架调用时更新TryCount和其他信息。我通过EasyNetQ提供的IoC支持,基于每个用户将此自定义序列化程序附加到框架。
public class RetryEnabledErrorMessageSerializer<T> : IErrorMessageSerializer where T : class, IMessageType
{
public string Serialize(byte[] messageBody)
{
string stringifiedMsgBody = Encoding.UTF8.GetString(messageBody);
var objectifiedMsgBody = JObject.Parse(stringifiedMsgBody);
// Add/update RetryInformation into objectifiedMsgBody here
// I have a dictionary that saves <key:consumerId, val: TryInfoObj>
return JsonConvert.SerializeObject(objectifiedMsgBody);
}
}
在我的EasyNetQ包装类中:
public void SetupMessageBroker(string givenSubscriptionId, bool enableRetry = false)
{
if (enableRetry)
{
_defaultBus = RabbitHutch.CreateBus(currentConnString,
serviceRegister => serviceRegister.Register<IErrorMessageSerializer>(serviceProvider => new RetryEnabledErrorMessageSerializer<IMessageType>(givenSubscriptionId))
);
}
else // EasyNetQ's DefaultErrorMessageSerializer will wrap error messages
{
_defaultBus = RabbitHutch.CreateBus(currentConnString);
}
}
public bool SubscribeAsync<T>(Func<T, Task> eventHandler, string subscriptionId)
{
IMsgHandler<T> currMsgHandler = new MsgHandler<T>(eventHandler, subscriptionId);
// Using the msgHandler allows to add a mediator between EasyNetQ and the actual callback function
// The mediator can transmit the retried msg or choose to ignore it
return _defaultBus.SubscribeAsync<T>(subscriptionId, currMsgHandler.InvokeMsgCallbackFunc).Queue != null;
}
3)将消息添加到默认错误队列后,您可以拥有一个简单的控制台app / windows服务,该服务定期在其原始队列上重新发布现有错误消息。就像是:
var client = new ManagementClient(AppConfig.BaseAddress, AppConfig.RabbitUsername, AppConfig.RabbitPassword);
var vhost = client.GetVhostAsync("/").Result;
var aliveRes = client.IsAliveAsync(vhost).Result;
var errQueue = client.GetQueueAsync(Constants.EasyNetQErrorQueueName, vhost).Result;
var crit = new GetMessagesCriteria(long.MaxValue, Ackmodes.ack_requeue_false);
var errMsgs = client.GetMessagesFromQueueAsync(errQueue, crit).Result;
foreach (var errMsg in errMsgs)
{
var innerMsg = JsonConvert.DeserializeObject<Error>(errMsg.Payload);
var pubInfo = new PublishInfo(innerMsg.RoutingKey, innerMsg.Message);
pubInfo.Properties.Add("type", innerMsg.BasicProperties.Type);
pubInfo.Properties.Add("correlation_id", innerMsg.BasicProperties.CorrelationId);
pubInfo.Properties.Add("delivery_mode", innerMsg.BasicProperties.DeliveryMode);
var pubRes = client.PublishAsync(client.GetExchangeAsync(innerMsg.Exchange, vhost).Result, pubInfo).Result;
}
4)我有一个MessageHandler类,它包含一个回调函数。每当消息传递给消费者时,它就会转到MessageHandler,它决定消息try是否有效,如果是,则调用实际的回调。如果try无效(maxRetriesExceeded /消费者无论如何都不需要重试),我会忽略该消息。在这种情况下,您可以选择Dead Letter消息。
public interface IMsgHandler<T> where T: class, IMessageType
{
Task InvokeMsgCallbackFunc(T msg);
Func<T, Task> MsgCallbackFunc { get; set; }
bool IsTryValid(T msg, string refSubscriptionId); // Calls callback only
// if Retry is valid
}
以下是MsgHandler
中调用回调的中介函数:
public async Task InvokeMsgCallbackFunc(T msg)
{
if (IsTryValid(msg, CurrSubscriptionId))
{
await this.MsgCallbackFunc(msg);
}
else
{
// Do whatever you want
}
}