Azure Function and Service Bus usage with Mass Transit implementation. I have attached the repository where the issue is shown. Before running it, you need to fill the connection string in ProblemWithMassTransit/Program.cs
- variable con.而且您还需要在
ProblemWithMassTransitFunctions/localsettings.json
中填充此连接字符串。 coast是,当测试发表发布时,它将被函数1捕获,该功能将其传递给大众运输消费者。它遇到问题并引发异常。正确的方法是发布通过质量运输完成的故障消息。但是,下一步再次调用function1,然后再次调用直到最大交付尝试-10。然而,在host.json和大众运输文档中设置为true。在这种情况下,质量运输将尝试标记处理的消息。 如何解决这个问题?这是大众运输问题还是Azure功能问题?不幸的是,我在互联网上找不到解决方案。
测试存储库 - https://github.com/petrkasnal/problemblemblemwithmasstransit
谢谢你
当您与MassTransit结合使用Azure功能和服务总线时,您遇到的问题是一个常见的问题。由于Azure功能处理消息处理和重新检索以及MassTransit如何处理故障和检索的方式出现了问题。
关键要理解:当Azure函数收到消息时,它将处理消息,如果抛出异常,它将根据服务总线队列中的
maxDeliveryCount
autoComplete
设置host.json
autoComplete
MASSTRANSIT故障处理:当消费者抛出异常时,MassTransit发布了一条消息。这是预期的行为。
MASSTRANSIT期望该消息在处理故障后将其标记为消耗(或已丢失)。
要解决此问题,您需要确保在MassTransit处理故障之后,Azure功能不会重试消息。这里有几种方法:
1。true
Fault<T>
3.
Fault<T>
[FunctionName("Function1")]
public async Task Run(
[ServiceBusTrigger("queue-name", Connection = "ServiceBusConnectionString")] Message message,
ILogger log)
{
try
{
await _consumer.Consume(new ConsumeContext<TestMessage>(message));
}
catch (Exception ex)
{
log.LogError(ex, "Error processing message");
// Manually complete the message to prevent retries
await _messageReceiver.CompleteAsync(message.SystemProperties.LockToken);
}
}
public static void Main(string[] args)
{
var busControl = Bus.Factory.CreateUsingAzureServiceBus(cfg =>
{
cfg.Host("your-connection-string");
cfg.ReceiveEndpoint("queue-name", e =>
{
e.Consumer<TestMessageConsumer>();
});
});
busControl.Start();
}
autoComplete
选择最适合您的架构和要求的方法。如果您完全控制了消息处理逻辑,则使用MassTransit的本机Azure服务总线集成可能是最清洁的解决方案。否则,在Azure函数中手动处理异常和消息完成是可行的替代方案。