当消息无法在ServiceBusTrigger Azure功能中处理时,如何在x分钟内延迟处理相同的消息?

问题描述 投票:1回答:2

我有一个azure函数,从ServiceBus主题读取并调用第三方服务。如果服务中断,我想等待5分钟,然后再尝试使用相同的消息再次呼叫它。如何添加延迟以使天蓝色功能不会放弃该消息并立即重新启动它?

public static void Run([ServiceBusTrigger("someTopic", 
     "someSubscription", AccessRights.Manage, Connection = 
     "ServiceBusConnection")] BrokeredMessage message) 
{
     CallService(bodyOfBrokeredMessage); //service is down

     //How do I add a delay so the message won't be reprocessed immediately thus quickly exhausting it's max delivery count?
}
c# azure azure-functions azureservicebus azure-servicebus-topics
2个回答
0
投票

正如Josh所说,你可以简单地克隆原始邮件,设置预定的入队时间,发送克隆并完成原始邮件。

嗯,发送克隆并完成原始操作并不是原子操作,这是一种遗憾,因此如果处理过程在错误的时刻崩溃,我们再次看到原始文件的可能性非常小。

而另一个问题是克隆上的DeliveryCount将始终为1,因为这是一个全新的消息。因此,我们可以无限重新提交,永远不会对这封邮件进行刻字。

幸运的是,可以通过添加您自己的提交计数作为消息的属性来修复:

[FunctionName("DelayMessage")]
public static async Task DelayMessage([ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
            [ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,TraceWriter log)
{
     //handle any kind of error scenerio
     int resubmitCount = originalMessage.Properties.ContainsKey("ResubmitCount") ?  (int)originalMessage.Properties["ResubmitCount"] : 0;
     if (resubmitCount > 5)
     {
         Console.WriteLine("DEAD-LETTERING");
         originalMessage.DeadLetter("Too many retries", $"ResubmitCount is {resubmitCount}");
     }
     else
     {
         var newMessage = originalMessage.Clone();
         newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
         await newMessages.AddAsync(newMessage);
     }
}

有关更多详细信息,请参阅此article

此外,在LogicApp中实现等待/重试/出列下一个模式非常容易,因为这种类型的流控制正是LogicApps的设计目标。请参考这个SO thread


3
投票

一种选择是创建新消息并将该消息提交到队列,但将ScheduledEnqueueTimeUtc设置为将来五分钟。

        [FunctionName("DelayMessage")]
        public static async Task DelayMessage(
            [ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
            [ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,
            TraceWriter log)
        {
            //handle any kind of error scenerio

            var newMessage = originalMessage.Clone();

            newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);

            await newMessages.AddAsync(newMessage);

        }
© www.soinside.com 2019 - 2024. All rights reserved.