我们有一条可以运行长达1小时的消息。它只需要处理一次。我们正在遇到有关其他处理器可用的问题,但是当我们尝试完成消息时。
服务总线中的设置是:我们以以下方式设置服务总线:
var options = new ServiceBusProcessorOptions
{
AutoCompleteMessages = false,
MaxConcurrentCalls = 1,
PrefetchCount = 0,
ReceiveMode = ServiceBusReceiveMode.PeekLock,
MaxAutoLockRenewalDuration = TimeSpan.FromHours(2),
};
然后,我们以以下方式处理该消息:
private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs)
{
try
{
var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body);
_logger.LogInformation("Received message {MessageId} with body {MessageBody}",
processMessageEventArgs.Message.MessageId, rawMessageBody);
var repoRequest = JsonConvert.DeserializeObject<TMessage>(rawMessageBody);
if (repoRequest != null)
{
await ProcessMessage(repoRequest, processMessageEventArgs.Message.MessageId,
processMessageEventArgs.Message.ApplicationProperties,
processMessageEventArgs.CancellationToken);
}
else
{
_logger.LogError(
"Unable to deserialize to message contract {ContractName} for message {MessageBody}",
typeof(TMessage), rawMessageBody);
}
_logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId);
await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
}
catch (Exception ex)
{
await processMessageEventArgs.AbandonMessageAsync(processMessageEventArgs.Message);
_logger.LogError(ex, "Unable to handle message");
}
}
无论如何,我们正在获得持续的serviceBusreceiver.renewmessagelock沿途的例外情况,这很好,因为消息继续处理。但是,在消息结束时,当我们手动调用
await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);
时,它正在失败。
azure.Messaging.ServiceBus.ServiceBuseXception:提供的锁是 无效的。锁已过期,或者消息已经过 从队列中取出。有关更多信息,请参阅 https://aka.ms/servicebusexceptions。参考:xxxxxxx, TrackingId:xxxxx, SystemTracker:GI :: G9:4521499:AMQPS://xxxxxx.servicebus.windows.net/-eb5b7cf4; 25:30:30:31:31:source:source:/xxxxxxxx,filter:[]),: BI :: Connection1648(G9-96965):: Session1654 :: Link417153, 时间戳:2025-02-05T11:37:31(Messagelocklost)。用于故障排除 信息,请参阅Auto-Lock更新对https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot。 在 azure.messaging.servicebus.amqp.amqpreceiver.disposemessageasync(Guid 锁定,结果结果,处置史图,时间板 超时,Idictionary
2 propertiesToModify, String deadLetterReason, String deadLetterDescription) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__22
1.d.Movenext() ---堆栈跟踪的结束来自上一个位置的结尾---在azure.messaging.servicebus.servicebusretrypolicy.runoperation [t1,tresult](func4) 操作,T1 T1,Transport ConnectionsCope范围,取消token 取消token,boolean logimotionoutretriesasverbose) azure.messaging.servicebus.servicebusretrypolicy.Runoperation [t1](func`4 操作,T1 T1,Transport ConnectionsCope范围,取消token 取消token) azure.messaging.servicebus.amqp.amqpreceiver.completeasync(Guid Locktoken,取消tocellationToken) azure.messaging.servicebus.servicebusreceiver.completemessageasync(servicebusreceivedMessage 消息,取消tocellationToken)
4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose) at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func
我们应该做什么不同。我知道,即使Maxautolockrenewalduration设置为2小时,Azure并不能保证在所有条件下续订。但是我们应该如何处理?
长的任务不可靠。手动更新锁。
是@seanfeldman,@vladdx说,将消息存储在数据库或BLOB存储中。 aacknowledge消息立即在独立工人中分别处理。
基于自动锁定的续订
将在整个会话中锁定,只要您保留开放率,其他消费者就无法提供该消息。 SAME处理器继续处理会话的消息,直到关闭。 在服务总线中创建队列时,可启用
“ session”。
在启用会话之前,使用现有的常规队列处理器在队列中的所有消息。
队列是empty,启用会话并开始与SessionId
.一起发送消息。
将发件人添加到包括:
:
SessionId
使用基于ssession的处理器:
var message = new ServiceBusMessage(Encoding.UTF8.GetBytes("Your message body"))
{
SessionId = "MySession-" + Guid.NewGuid().ToString() // Each message belongs to a session
};
await sender.SendMessageAsync(message);