我有一个azure函数,从ServiceBus主题读取并调用第三方服务。如果服务中断,我想等待5分钟,然后再尝试使用相同的消息再次呼叫它。如何添加延迟以使天蓝色功能不会放弃该消息并立即重新启动它?
public static void Run([ServiceBusTrigger("someTopic",
"someSubscription", AccessRights.Manage, Connection =
"ServiceBusConnection")] BrokeredMessage message)
{
CallService(bodyOfBrokeredMessage); //service is down
//How do I add a delay so the message won't be reprocessed immediately thus quickly exhausting it's max delivery count?
}
正如Josh所说,你可以简单地克隆原始邮件,设置预定的入队时间,发送克隆并完成原始邮件。
嗯,发送克隆并完成原始操作并不是原子操作,这是一种遗憾,因此如果处理过程在错误的时刻崩溃,我们再次看到原始文件的可能性非常小。
而另一个问题是克隆上的DeliveryCount
将始终为1,因为这是一个全新的消息。因此,我们可以无限重新提交,永远不会对这封邮件进行刻字。
幸运的是,可以通过添加您自己的提交计数作为消息的属性来修复:
[FunctionName("DelayMessage")]
public static async Task DelayMessage([ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
[ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,TraceWriter log)
{
//handle any kind of error scenerio
int resubmitCount = originalMessage.Properties.ContainsKey("ResubmitCount") ? (int)originalMessage.Properties["ResubmitCount"] : 0;
if (resubmitCount > 5)
{
Console.WriteLine("DEAD-LETTERING");
originalMessage.DeadLetter("Too many retries", $"ResubmitCount is {resubmitCount}");
}
else
{
var newMessage = originalMessage.Clone();
newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
await newMessages.AddAsync(newMessage);
}
}
有关更多详细信息,请参阅此article。
此外,在LogicApp中实现等待/重试/出列下一个模式非常容易,因为这种类型的流控制正是LogicApps的设计目标。请参考这个SO thread。
一种选择是创建新消息并将该消息提交到队列,但将ScheduledEnqueueTimeUtc设置为将来五分钟。
[FunctionName("DelayMessage")]
public static async Task DelayMessage(
[ServiceBusTrigger("MyQueue", AccessRights.Listen, Connection = "MyConnection")]BrokeredMessage originalMessage,
[ServiceBus("MyQueue", AccessRights.Send, Connection = "MyConnection")]IAsyncCollector<BrokeredMessage> newMessages,
TraceWriter log)
{
//handle any kind of error scenerio
var newMessage = originalMessage.Clone();
newMessage.ScheduledEnqueueTimeUtc = DateTime.UtcNow.AddMinutes(5);
await newMessages.AddAsync(newMessage);
}