在长时间运行的进程过期之前更新锁定

问题描述 投票:0回答:1

我有这个服务总线触发器,我试图在锁过期之前更新锁。 据我所知,一条消息的最长生存时间是 5 分钟。但我的任务需要 20 分钟才能完成。因此,为了解决这种情况,我在 host.json 文件中进行了以下更改

"messageHandlerOptions": { "autoComplete": false, "maxConcurrentCalls": 300, "maxAutoRenewDuration": "00:30:00" }

在我的接收器功能中,我进行了以下更改,以每 4 分钟到 25 分钟更新一次锁定。

    [Function("XmlProcessor")]
    public async Task Run([ServiceBusTrigger("myqueue",
        Connection = "AzureServiceBusConnectionString",
        IsSessionsEnabled = false)] string message, ServiceBusReceivedMessage receivedMessage, FunctionContext context)
    {
        _logger.LogInformation($"XmlProcessor triggered for message: {message}");

        try
        {
            var cts = new CancellationTokenSource();
            cts.CancelAfter(TimeSpan.FromMinutes(25));
            var receiver = _client.CreateReceiver("trmxmlprocessor");
            Task renewLockTask = RenewLock(receiver, receivedMessage, cts.Token);
            await _initializer.Start(message);
            await receiver.CompleteMessageAsync(receivedMessage);
            cts.Cancel();
            renewLockTask.Dispose();
            _logger.LogInformation($"XmlProcessor completed for message: {message}");

        }
        catch (Exception ex)
        {
            _logger.LogError($"XmlProcessor root exception: {ex}");
            throw new InvalidOperationException("Processing failed. Further retries are not allowed.", ex);
        }
        _logger.LogInformation($"XmlProcessor completed for message: {message}");
    }

//更新锁定方法

    private async Task RenewLock(ServiceBusReceiver receiver, ServiceBusReceivedMessage receivedMessage, CancellationToken token)
{
    var renewalBuffer = TimeSpan.FromMinutes(1);
    while (!token.IsCancellationRequested)
    {
        try
        {
            await Task.Delay(TimeSpan.FromMinutes(4), token);
            await receiver.RenewMessageLockAsync(receivedMessage,token);
            _logger.LogInformation("Message Lock Renewed");
        }
        catch (TaskCanceledException)
        {
            break;
        }
        catch (Exception ex)
        {
            _logger.LogError(ex, "Failed to renewing lock");
        }
    }
}

但是我总是在 await receive.CompleteMessageAsync(receivedMessage); 处遇到异常。因为我总是将 ServiceBusMessage 设为 null。

c# azure azure-functions servicebus azure-servicebus-queues
1个回答
0
投票

使用 Azure Function Service Bus 触发器时,触发器会在后台处理 ServiceBusClient 及其接收器。有关更多信息,请参阅几年前我的问题的这个答案。这意味着您可能不想使用 ServiceBusClient (

_client
) 来创建新的接收器。 IE。通过使用触发器,您就已经在引擎盖下有了一个接收器。这个以及其他一些因素意味着您可能必须切换一些用于处理收到的消息的方法。

消息绑定

首先,你的

ServiceBusReceivedMessage
为空的原因是因为
ServiceBusTriggerAttribute
被放置在
string message
参数上,而不是
ServiceBusReceivedMessage
参数上。我建议删除
string
,只保留
ServiceBusReceivedMessage
ServiceBusReceivedMessage
它的信息更丰富,您仍然可以通过
ServiceBusReceivedMessage
以字符串形式访问消息正文。另一个优点是您可以在
ServiceBusReceivedMessage
上执行操作。

消息操作

看起来您的触发器位于“进程外”Azure Functions 应用程序中,基于您放在服务总线触发器方法上的“FunctionAttribute”。默认情况下,消息在触发器成功执行后完成。可以通过禁用 host.json 中的 AutoCompleteMessages 标志或在 ServiceBusTriggerAttribute 上禁用此功能。一旦禁用,而不是使用

await receiver.CompleteMessageAsync(receivedMessage)
要完成消息,请将

ServiceBusMessageActions
 绑定为方法参数,并使用 

await actions.CompleteMessageAsync(receivedMessage);

(但是,请注意,

ServiceBusMessageActions
 首先在 
Microsoft.Azure.Functions.Worker.Extensions.ServiceBus

包版本

5.14.0
中可绑定进程外触发器。旧版本的工作程序不允许消息操作。)
续订锁
最后,当使用服务总线触发器时,未完成处理的消息将自动更新其锁定,直到到达 

maxAutoRenewDuration

(来自 host.json)。这消除了您打电话的需要

await receiver.RenewMessageLockAsync(receivedMessage,token);

手动。

消息的生存时间上限不限于 5 分钟,但可能受到您的
服务总线计划

、消息发送者以及对队列、主题和/或订阅设置的限制的限制。

在这里阅读更多内容

。但是,默认情况下,Azure Functions 的“超时”是有限的。可以增加此超时限制,具体取决于 Functions 应用程序的托管计划。 请参阅此处的限制。您可以在 host.json 文件中为 Functions 应用程序设置全局函数超时,如下所示: { "version": "2.0", "extensions": { "serviceBus": { "messageHandlerOptions": { "maxAutoRenewDuration": "00:30:00" } } }, "functionTimeout": "00:30:00" } 要遵循的规则是让

maxAutoRenewDuration
大于或等于

functionTimeout
在这里看看为什么

我没有对你的
RenewLock()
方法进行太多思考,如果你修复了其他问题,那么就不需要它了。但要考虑一件事,如果您启动更新任务并且没有任何其他线程来运行该进程,会发生什么? 将它们放在一起

结合所有内容,您的触发器将如下所示:

[Function("XmlProcessor")]
public async Task Run([ServiceBusTrigger("myqueue", Connection = "AzureServiceBusConnectionString", IsSessionsEnabled = false, AutoCompleteMessages = false)] ServiceBusReceivedMessage receivedMessage, 
    ServiceBusMessageActions actions, 
    FunctionContext context)
{
    _logger.LogInformation($"XmlProcessor triggered for message: {message}");

    try
    {
        await _initializer.Start(message);
        await actions.CompleteMessageAsync(receivedMessage);

        _logger.LogInformation($"XmlProcessor completed for message: {message}");
    }
    catch (Exception ex)
    {
        _logger.LogError($"XmlProcessor root exception: {ex}");
        throw new InvalidOperationException("Processing failed. Further retries are not allowed.", ex);
    }
    _logger.LogInformation($"XmlProcessor completed for message: {message}");
}

和你的host.json像这样:

{ "version": "2.0", "extensions": { "serviceBus": { "messageHandlerOptions": { "maxConcurrentCalls": 300, "maxAutoRenewDuration": "00:30:00" } } }, "functionTimeout": "00:30:00" }

	

© www.soinside.com 2019 - 2025. All rights reserved.