实现 C# .NET 7 RestAPI RabbitMQ 时出错 - MassTransit 死信队列 (DLQ)

问题描述 投票:0回答:1

我正在尝试更好地使用微服务,但我陷入了困境。我正在合作:

  • ASP.NET Core 7 Web API(用于微服务)
  • C#
  • SQL 服务器
  • RabbitMQ 和 MassTransit
  • Docker compose(使用 Linux 容器)

我还是个新手,不断学习。因此,我已经有了一个基本的队列,但当出现问题时我就会遇到麻烦,尤其是在从调用 API 管理死信时。

发生的事情是这样的:有东西发送了一个

POST
请求,队列显示“酷,明白了”(又名“已接受”)。

现在,我想要的是:如果我强制发生错误,我希望消息尝试重新发送几次(重试),然后,如果仍然无法发送,则移至错误死信队列(DLQ)。

但奇怪的是:当我发送请求时,它首先直接进入DLQ错误队列,然后跳回到主队列,尝试发送几次(重试),如果仍然不行,跳回到错误队列。 🤔

代码可以在我的GitHub Repo 上找到。这是一个使用 docker 容器的 POC。

这张图应该说明我希望实现的目标:

我的 RabbitMQ 和 MassTransit 员工 API DI:

builder.Services.AddMassTransit(config =>
{
     config.AddConsumer<EmployeeDtoEventConsumer>();

     config.UsingRabbitMq((ctx, cfg) =>
     {
         cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);

         cfg.ReceiveEndpoint("employee_q", c =>
         {
             c.PrefetchCount = 16;

             c.ConfigureConsumer<EmployeeDtoEventConsumer>(ctx);

         });

         cfg.UseMessageRetry(retryConfigurator =>
         {
             retryConfigurator.Interval(3, TimeSpan.FromSeconds(1));
             retryConfigurator.Handle<Exception>();
         });
     });
});

builder.Services.AddScoped<EmployeeDtoEventConsumer>();

这是 DQLConsumer API DI:

builder.Services.AddMassTransit(config =>
{
     config.AddConsumer<DlqConsumer>();

     config.UsingRabbitMq((ctx, cfg) =>
     {
         cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);

         cfg.ReceiveEndpoint("employee_q_error", c =>
         {
             c.PrefetchCount = 16;

             c.ConfigureConsumer<DlqConsumer>(ctx);

         });
     });
});

builder.Services.AddScoped<DlqConsumer>();

谁能帮我弄清楚:

  1. 如何让消息在主队列重试完成后才进入DL错误队列?
  2. 在 RabbitMQ 和 MassTransit 中管理死信有什么简单的提示或技巧吗?

非常感谢您提供的任何帮助!

c# rabbitmq masstransit dead-letter dlq
1个回答
0
投票

订单很重要,或者您可以简化:

builder.Services.AddMassTransit(config =>
{
     config.AddConsumer<EmployeeDtoEventConsumer>();

     config.UsingRabbitMq((ctx, cfg) =>
     {
         cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);

         cfg.ReceiveEndpoint("employee_q", c =>
         {
             c.PrefetchCount = 16;

             cfg.UseMessageRetry(retryConfigurator =>
             {
                 retryConfigurator.Interval(3, TimeSpan.FromSeconds(1));
                 retryConfigurator.Handle<Exception>();
             });

             c.ConfigureConsumer<EmployeeDtoEventConsumer>(ctx);
         });
     });
});

另外,完全删除它:

builder.Services.AddScoped<EmployeeDtoEventConsumer>();
© www.soinside.com 2019 - 2024. All rights reserved.