复制 .NET 8.0 升级时的服务总线消息问题?

问题描述 投票:0回答:1

我有一个服务总线触发功能,它尝试检索订单,如果订单不可用(未找到),它将复制服务总线消息并在 5 分钟排队时间内重新提交。在进入死信之前会重试 3 次。

它一直运行良好,但随着升级到 .NET 8,我发现了一些问题。从下面的屏幕截图中可以看到,日志消息包含第一次重新提交时的交易号,但在下次重试时,交易号不再可用,我认为这意味着

new ServiceBusMessage(message)
不再复制所有自定义属性。此外,重试计数器的自动递增不再起作用。

还有其他人发现这是 .NET 8 升级的问题吗?

以下是日志消息: enter image description here

这是我目前在未找到交易时重试的代码。

    catch (Exception ex) when (ex.Message.Contains("Not Found"))
    {
        var parsedMessage = _messageReceiver.ParseServiceBusMessage(message);

        var retryCount = message.DeliveryCount;

        if (retryCount <= _config.MaxRetryCount)
        {
            // delete current sb message and create a new one to resubmit
            using (var ts = new TransactionScope(TransactionScopeAsyncFlowOption.Enabled))
            {
                await _messageReceiver.CompleteMessageAsync(message, messageReceiver);

                var newMessage = new ServiceBusMessage(message);
                newMessage.ScheduledEnqueueTime = DateTime.UtcNow.AddMinutes(_config.HoldServiceBusMsgMinutes);
                await _serviceBusClient.SendAsync(newMessage);

                _logger.LogInformation("Retry counter: {retryCount}  Processing order number {transactionNumber} resulted in not found condition. Resubmitted to service bus", retryCount, parsedMessage.Transaction);
            }
        }
        else
        {
            _logger.LogError("Max retry count reached for transaction number {transactionNumber}", parsedMessage.Transaction);
            await _messageReceiver.DeadletterMessageAsync(message, messageReceiver, ex);
        }
    }

如果有人有任何建议,我们将不胜感激!

c# azure azureservicebus .net-8.0 azure-functions-isolated
1个回答
0
投票

使用以下代码重试对我有用:

using Azure.Messaging.ServiceBus;
using Microsoft.Azure.Functions.Worker;
using Microsoft.Extensions.Logging;

namespace RithApp
{
    public class Function1
    {
        private readonly ILogger<Function1> ri_lg;

        public Function1(ILogger<Function1> logger)
        {
            ri_lg = logger;
        }

        [Function(nameof(Function1))]
        public async Task Run(
            [ServiceBusTrigger("myqueue", Connection = "rithwikserviceconnection")]
            ServiceBusReceivedMessage message,
            ServiceBusMessageActions rimsgact)
        {
            ri_lg.LogInformation("Hello Rithwik, the Message is : {body}", message.Body);

            try
            {
                bool ri_of = false; 

                var retryCount = message.ApplicationProperties.ContainsKey("RetryCount")
                    ? (int)message.ApplicationProperties["RetryCount"]
                    : 0;

                var ri_tn = message.ApplicationProperties.ContainsKey("ri_tn")
                    ? message.ApplicationProperties["ri_tn"].ToString()
                    : "Unknown";

                if (!ri_of && retryCount < 3)
                {
                    ri_lg.LogWarning($"Hello Rithwik, Order not found for transc {ri_tn}.and retrying it again, the Retry Count: {retryCount}");

                    retryCount++;

                    var rith_msg = new ServiceBusMessage(message.Body)
                    {
                        ContentType = message.ContentType,
                        CorrelationId = message.CorrelationId,
                        MessageId = Guid.NewGuid().ToString(),
                        ScheduledEnqueueTime = DateTime.UtcNow.AddMinutes(1)
                    };

                    foreach (var prp in message.ApplicationProperties)
                    {
                        rith_msg.ApplicationProperties[prp.Key] = prp.Value;
                    }

                    rith_msg.ApplicationProperties["RetryCount"] = retryCount;

                    var sbc = new ServiceBusClient(Environment.GetEnvironmentVariable("rithwikserviceconnection"));
                    var ri_sen = sbc.CreateSender("myqueue");

                    await ri_sen.SendMessageAsync(rith_msg);
                    ri_lg.LogInformation($"Reetry count {retryCount}.");

                    await rimsgact.CompleteMessageAsync(message);
                }
                else if (retryCount >= 3)
                {
                    ri_lg.LogError($"Hello Rithwik, Max retry attempts reached for transaction {ri_tn}. Moving to dead-letter.");

                    var Dead_props = new Dictionary<string, object>
                    {
                        { "Reason for dead lettring ", "3 retries happened" },
                        { "Des", "Order is not found" }
                    };

                    await rimsgact.DeadLetterMessageAsync(message, Dead_props);
                }
            }
            catch (Exception ri)
            {
                ri_lg.LogError($"Hello Rthwik, Erro is : {ri.Message}");
                throw; 
            }
        }
    }
}

csproj:

<Project Sdk="Microsoft.NET.Sdk">
  <PropertyGroup>
    <TargetFramework>net8.0</TargetFramework>
    <AzureFunctionsVersion>v4</AzureFunctionsVersion>
    <OutputType>Exe</OutputType>
    <ImplicitUsings>enable</ImplicitUsings>
    <Nullable>enable</Nullable>
  </PropertyGroup>
  <ItemGroup>
    <FrameworkReference Include="Microsoft.AspNetCore.App" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker" Version="2.0.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.Http.AspNetCore" Version="2.0.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Extensions.ServiceBus" Version="5.22.0" />
    <PackageReference Include="Microsoft.Azure.Functions.Worker.Sdk" Version="2.0.0" />
  </ItemGroup>
  <ItemGroup>
    <None Update="host.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
    </None>
    <None Update="local.settings.json">
      <CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
      <CopyToPublishDirectory>Never</CopyToPublishDirectory>
    </None>
  </ItemGroup>
  <ItemGroup>
    <Using Include="System.Threading.ExecutionContext" Alias="ExecutionContext" />
  </ItemGroup>
</Project>

输出:

enter image description here

enter image description here

在服务总线死信队列中:

enter image description here

最新问题
© www.soinside.com 2019 - 2024. All rights reserved.