MassTransit:Amazon SQS FIFO 队列的批量使用者

问题描述 投票:0回答:1

我尝试为 Amazon SQS FIFO 队列配置批量使用者,但没有成功,使用者始终基于单个消息触发。 我想这可能是 FIFO 队列的限制,但如果能得到其他人的反馈那就太好了。

这是我正在使用的代码:

// Message:
public record BatchMessage();

// Consumer:
public class BatchMessageConsumer : IConsumer<Batch<BatchMessage>>
{
    public Task Consume(ConsumeContext<Batch<BatchMessage>> context)
    {
        Debug.WriteLine($"Messages={context.Message.Length}");

        return Task.CompletedTask;
    }
}

// ConsumerDefinition:
public class BatchMessageConsumerDefinition : ConsumerDefinition<BatchMessageConsumer>
{
    public BatchMessageConsumerDefinition()
    {
        // SQS Queue name.
        Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
    }
}

// Configuration:
services.AddMassTransit(x =>
{
    x.AddConsumers(typeof(Program).Assembly);

    x.UsingAmazonSqs((context, sqs) =>
    {
        sqs.Host("region", (_) => { });

        // SNS Topic name.
        sqs.Message<BatchMessage>(x => x.SetEntityName("BatchMessage.fifo"));

        sqs.ConfigureEndpoints(context);
    });
});

// Trigger:
var groupId = Guid.NewGuid().ToString();

for (var i = 1; i <= 10; i++)
{
    await _publishEndpoint.Publish(new BatchMessage(), (context) =>
    {
        // Required for FIFO messages.
        context.TrySetGroupId(groupId);
        context.TrySetDeduplicationId(context.MessageId.ToString());
    });
}
amazon-sqs masstransit aws-sqs-fifo
1个回答
0
投票

您应该更新您的使用者定义以配置相同的并发交付的传输

MessageGroupId
:

// ConsumerDefinition:
public class BatchMessageConsumerDefinition : 
    ConsumerDefinition<BatchMessageConsumer>
{
    public BatchMessageConsumerDefinition()
    {
        // SQS Queue name.
        Endpoint(x => x.Name = $"Demo-BatchMessage.fifo");
    }

    protected override void ConfigureConsumer(IReceiveEndpointConfigurator endpointConfigurator,
        IConsumerConfigurator<BatchMessageConsumer> consumerConfigurator, IRegistrationContext context)
    {
        consumerConfigurator.Options<BatchOptions>(o => o.SetMessageLimit(10).SetTimeLimit(1000));

        if(endpointConfigurator is IAmazonSqsReceiveEndpointConfigurator cfg)
        {
            cfg.ConcurrentDeliveryLimit = 5;
        }
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.