ZURE服务总线:长期运行的消息不续签

问题描述 投票:0回答:1

我们有一条可以运行长达1小时的消息。它只需要处理一次。我们正在遇到有关其他处理器可用的问题,但是当我们尝试完成消息时。

服务总线中的设置是:

service bus

我们以以下方式设置服务总线:

var options = new ServiceBusProcessorOptions { AutoCompleteMessages = false, MaxConcurrentCalls = 1, PrefetchCount = 0, ReceiveMode = ServiceBusReceiveMode.PeekLock, MaxAutoLockRenewalDuration = TimeSpan.FromHours(2), };
然后,我们以以下方式处理该消息:

private async Task HandleMessageAsync(ProcessMessageEventArgs processMessageEventArgs) { try { var rawMessageBody = Encoding.UTF8.GetString(processMessageEventArgs.Message.Body); _logger.LogInformation("Received message {MessageId} with body {MessageBody}", processMessageEventArgs.Message.MessageId, rawMessageBody); var repoRequest = JsonConvert.DeserializeObject<TMessage>(rawMessageBody); if (repoRequest != null) { await ProcessMessage(repoRequest, processMessageEventArgs.Message.MessageId, processMessageEventArgs.Message.ApplicationProperties, processMessageEventArgs.CancellationToken); } else { _logger.LogError( "Unable to deserialize to message contract {ContractName} for message {MessageBody}", typeof(TMessage), rawMessageBody); } _logger.LogInformation("Message {MessageId} processed", processMessageEventArgs.Message.MessageId); await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message); } catch (Exception ex) { await processMessageEventArgs.AbandonMessageAsync(processMessageEventArgs.Message); _logger.LogError(ex, "Unable to handle message"); } }

无论如何,我们正在获得持续的serviceBusreceiver.renewmessagelock沿途的例外情况,这很好,因为消息继续处理。但是,在消息结束时,当我们手动调用
await processMessageEventArgs.CompleteMessageAsync(processMessageEventArgs.Message);

时,它正在失败。

azure.Messaging.ServiceBus.ServiceBuseXception:提供的锁是
无效的。锁已过期,或者消息已经过
从队列中取出。有关更多信息,请参阅

https://aka.ms/servicebusexceptions
。参考:xxxxxxx, TrackingId:xxxxx, SystemTracker:GI :: G9:4521499:AMQPS://xxxxxx.servicebus.windows.net/-eb5b7cf4; 25:30:30:31:31:source:source:/xxxxxxxx,filter:[]),: BI :: Connection1648(G9-96965):: Session1654 :: Link417153, 时间戳:2025-02-05T11:37:31(Messagelocklost)。用于故障排除 信息,请参阅

https://aka.ms/azsdk/net/servicebus/exceptions/troubleshoot。 在 azure.messaging.servicebus.amqp.amqpreceiver.disposemessageasync(Guid 锁定,结果结果,处置史图,时间板 超时,Idictionary2 propertiesToModify, String deadLetterReason, String deadLetterDescription) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.CompleteInternalAsync(Guid lockToken, TimeSpan timeout) at Azure.Messaging.ServiceBus.Amqp.AmqpReceiver.<>c.<<CompleteAsync>b__47_0>d.MoveNext() --- End of stack trace from previous location --- at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.<>c__221.d.Movenext() ---堆栈跟踪的结束来自上一个位置的结尾---在azure.messaging.servicebus.servicebusretrypolicy.runoperation [t1,tresult](func

4 operation, T1 t1, TransportConnectionScope scope, CancellationToken cancellationToken, Boolean logTimeoutRetriesAsVerbose)    at Azure.Messaging.ServiceBus.ServiceBusRetryPolicy.RunOperation[T1,TResult](Func
4) 操作,T1 T1,Transport ConnectionsCope范围,取消token 取消token,boolean logimotionoutretriesasverbose) azure.messaging.servicebus.servicebusretrypolicy.Runoperation [t1](func`4 操作,T1 T1,Transport ConnectionsCope范围,取消token 取消token) azure.messaging.servicebus.amqp.amqpreceiver.completeasync(Guid Locktoken,取消tocellationToken) azure.messaging.servicebus.servicebusreceiver.completemessageasync(servicebusreceivedMessage 消息,取消tocellationToken)
我们应该做什么不同。我知道,即使Maxautolockrenewalduration设置为2小时,Azure并不能保证在所有条件下续订。但是我们应该如何处理?
    

Auto-Lock更新对

长的任务不可靠。手动更新锁。

是@seanfeldman,@vladdx说,
c# azure azureservicebus azure-servicebus-queues
1个回答
0
投票

将消息存储在数据库或BLOB存储中。 aacknowledge消息立即在独立工人中分别处理。

基于自动锁定的续订
    iNSTEAD基于Sessession的消息提供了一个独家锁定,只要会话保持active
  • 。 Azure Service Bus

    将在整个会话中锁定,只要您保留开放率,其他消费者就无法提供该消息。 SAME处理器继续处理会话的消息,直到关闭。 在服务总线中创建队列时,可启用

    “ session”。

在启用会话之前,使用现有的常规队列处理器在队列中的所有消息。

队列是

empty,启用会话并开始与SessionId.一起发送消息。

将发件人添加到包括:

SessionId

使用基于ssession的处理器:enter image description here

var message = new ServiceBusMessage(Encoding.UTF8.GetBytes("Your message body")) { SessionId = "MySession-" + Guid.NewGuid().ToString() // Each message belongs to a session }; await sender.SendMessageAsync(message);
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.