我正在尝试更好地使用微服务,但我陷入了困境。我正在合作:
我还是个新手,不断学习。因此,我已经有了一个基本的队列,但当出现问题时我就会遇到麻烦,尤其是在从调用 API 管理死信时。
发生的事情是这样的:有东西发送了一个
POST
请求,队列显示“酷,明白了”(又名“已接受”)。
现在,我想要的是:如果我强制发生错误,我希望消息尝试重新发送几次(重试),然后,如果仍然无法发送,则移至错误死信队列(DLQ)。
但奇怪的是:当我发送请求时,它首先直接进入DLQ错误队列,然后跳回到主队列,尝试发送几次(重试),如果仍然不行,跳回到错误队列。 🤔
代码可以在我的GitHub Repo 上找到。这是一个使用 docker 容器的 POC。
这张图应该说明我希望实现的目标:
我的 RabbitMQ 和 MassTransit 员工 API DI:
builder.Services.AddMassTransit(config =>
{
config.AddConsumer<EmployeeDtoEventConsumer>();
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);
cfg.ReceiveEndpoint("employee_q", c =>
{
c.PrefetchCount = 16;
c.ConfigureConsumer<EmployeeDtoEventConsumer>(ctx);
});
cfg.UseMessageRetry(retryConfigurator =>
{
retryConfigurator.Interval(3, TimeSpan.FromSeconds(1));
retryConfigurator.Handle<Exception>();
});
});
});
builder.Services.AddScoped<EmployeeDtoEventConsumer>();
这是 DQLConsumer API DI:
builder.Services.AddMassTransit(config =>
{
config.AddConsumer<DlqConsumer>();
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);
cfg.ReceiveEndpoint("employee_q_error", c =>
{
c.PrefetchCount = 16;
c.ConfigureConsumer<DlqConsumer>(ctx);
});
});
});
builder.Services.AddScoped<DlqConsumer>();
谁能帮我弄清楚:
非常感谢您提供的任何帮助!
订单很重要,或者您可以简化:
builder.Services.AddMassTransit(config =>
{
config.AddConsumer<EmployeeDtoEventConsumer>();
config.UsingRabbitMq((ctx, cfg) =>
{
cfg.Host(builder.Configuration["EventBusSettings:HostAddress"]);
cfg.ReceiveEndpoint("employee_q", c =>
{
c.PrefetchCount = 16;
cfg.UseMessageRetry(retryConfigurator =>
{
retryConfigurator.Interval(3, TimeSpan.FromSeconds(1));
retryConfigurator.Handle<Exception>();
});
c.ConfigureConsumer<EmployeeDtoEventConsumer>(ctx);
});
});
});
另外,完全删除它:
builder.Services.AddScoped<EmployeeDtoEventConsumer>();