我尝试为 Amazon SQS FIFO 队列配置批量使用者,但没有成功,使用者始终基于单个消息触发。 我想这可能是 FIFO 队列的限制,但如果能得到其他人的反馈那就太好了。
这是我正在使用的代码:
// Message:
public record BatchMessage();
// Consumer:
public class BatchMessageConsumer : IConsumer<Batch<BatchMessage>>
{
public Task Consume(ConsumeContext<Batch<BatchMessage>> context)
{
Debug.WriteLine($"Messages={context.Message.Length}");
return Task.CompletedTask;
}
}
// ConsumerDefinition:
public class BatchMessageConsumerDefinition : ConsumerDefinition<BatchMessageConsumer>
{
public BatchMessageConsumerDefinition()
{
// SQS Queue name.
Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
}
}
// Configuration:
services.AddMassTransit(x =>
{
x.AddConsumers(typeof(Program).Assembly);
x.UsingAmazonSqs((context, sqs) =>
{
sqs.Host("region", (_) => { });
// SNS Topic name.
sqs.Message<BatchMessage>(x => x.SetEntityName("BatchMessage.fifo"));
sqs.ConfigureEndpoints(context);
});
});
// Trigger:
var groupId = Guid.NewGuid().ToString();
for (var i = 1; i <= 10; i++)
{
await _publishEndpoint.Publish(new BatchMessage(), (context) =>
{
// Required for FIFO messages.
context.TrySetGroupId(groupId);
context.TrySetDeduplicationId(context.MessageId.ToString());
});
}
您应该更新您的使用者定义以配置相同的并发交付的传输
MessageGroupId
:
// ConsumerDefinition:
public class BatchMessageConsumerDefinition :
ConsumerDefinition<BatchMessageConsumer>
{
public BatchMessageConsumerDefinition()
{
// SQS Queue name.
Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
}
protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
IConsumerConfigurator<BatchMessageConsumer> consumerConfigurator, IRegistrationContext context)
{
consumerConfigurator.Options<BatchOptions>(o => o.SetMessageLimit(10).SetTimeLimit(1000));
if(endpointConfigurator is IAmazonSqsReceiveEndpointConfigurator cfg)
{
cfg.ConcurrentDeliveryLimit = 5;
}
}
}