防止 XY 问题的一些背景: 我正在开发一个系统,其中每 X 分钟发送一条消息以在服务中的 Pod 上运行作业。当所有 pod 都不健康或重新启动时,这些消息可能会在队列中堆积。然后,当 Pod 重新启动时,它们开始立即消耗所有消息。这可能会导致问题,因为作业运行的频率不得超过每 X 分钟一次,这一点很重要。
此问题的解决方案似乎是 PurgeOnStartup 标志。当 Pod 重新启动时,它会清除队列,然后等待下一条作业消息的出现。我在将 PurgeOnStartup 添加到我们的配置中时遇到一些问题。
目前,我们将消费者添加到这样的配置中:
configurator.AddConsumer<SendTemplatedEmailMessage2Consumer>(c => c.UseConcurrentMessageLimit(1));
然后由中央 nuget 包中的代码使用它:
configurator.UsingRabbitMq((context, cfg) =>
{
if (settings.RabbitmqDelayedMessageExchangeEnabled)
{
cfg.UseConsumeFilter(typeof(HealthyConsumerFilter<>), context);
}
cfg.Host(settings.GetUri(), host =>
{
host.Username(settings.UserName);
host.Password(settings.Password);
});
cfg.ConfigureEndpoints(context);
第二部分是我们在所有服务中使用的设置,所以我不想更改它。我也只想将其添加到这个队列中。但是,AddConsumer() 配置器似乎不允许设置 PurgeOnStartup。我在过滤器和管道规格中有点迷失了,它似乎不是这样的。 是否可以在通过 AddConsumer() 创建的队列上配置 PurgeOnStartup? 如果没有,如何通过UsingRabbitMq 配置将其添加到单个消费者/队列?
嗯,看来您想为通过 RabbitMQ 中的 AddConsumer 方法创建的某个队列设置 PurgeOnStartup 标志。不幸的是,AddConsumer 方法本身并不直接支持这样的 PurgeOnStartup 标志配置。
为此,您可能必须采取相反的方式。您可以尝试手动构建使用者并为其提供所需的配置 - 换句话说,让 PurgeOnStartup 自己标记 - 而不通过 AddConsumer 方法。
以下是您可以如何做到这一点:
cfg.ReceiveEndpoint("your_queue_name", endpoint =>
{
endpoint.UseConcurrentMessageLimit(1); // Your existing configuration
// Set PurgeOnStartup for this specific queue
endpoint.PurgeOnStartup = true; // Set this flag to purge on startup
endpoint.Consumer<SendTemplatedEmailMessage2Consumer>(context);
});
这样,您可以为特定队列创建使用者(
your_queue_name
应替换为您的实际队列名称)并为该队列设置 PurgeOnStartup 标志,同时保持其余配置不变。