公共交通3.5, 兔子Mq
我有一个总线连接服务,可以监听队列数量。
所有队列都接受相同的消息类型,并且所有队列都使用竞争消费者模式,其中不同机器上有多个服务侦听这些相同的队列。
我所追求的是为每个进程的所有这些队列设置并发限制。
我的资源(可用许可证数量)有限,并且需要将并发请求限制为每台计算机的可用许可证数量。
所以机器 A 可能有 4 个,机器 B 可能有 10 个,等等。
如果我已经有 4 个消费者处理所有队列中的消息,我不希望机器 A 消费所有这些队列中的任何消息,它应该让其他人(机器 B、C 等)在有可用资源的情况下消费它。
我的问题是使用
sbc.UseConcurrencyLimit(4)
里面
var bus = MassTransit.Bus.Factory.CreateUsingRabbitMq
设置每个队列的并发限制,所以如果我有 4 个队列,它们就会全部相加。
我需要的是所有队列累计达到并发限制,但不超过它。
MassTransit 中有内置方法可以实现此目标吗?
没有内置方法来限制多个服务之间的并发。这需要使用某种类型的全局资源管理器,而 MassTransit 并不支持开箱即用。
我正在为新版本的 MassTransit 添加此答案。目前,版本 8,我可以累计限制并发数。
有两个地方可以设置并发限制。
其中一个是定义 ReceiveEndpoint 时。这将设置每个队列的最大并发数。
cfg.ReceiveEndpoint("Q_Import_Import", e =>
{
e.ConfigureConsumer<TrafficImportConsumer>(context);
e.UseConcurrencyLimit(1);
});
第二个地方是当您添加 MassTransit 服务时。通过设置
UseConcurrencyLimit
可以累计设置最大并发数
builder.Services.AddMassTransit(x =>
x.UsingInMemory((context, cfg) =>
{
cfg.UseConcurrencyLimit(4);
cfg.ConfigureEndpoints(context);
});