如何批量确认?

问题描述 投票:0回答:2

我有一个 RabbitMQ 监听器,从那里我处理传入的数据,将它们收集到 7500 个项目的大小,并将它们作为批量保存在 Elasticsearch 上(ElasticClient.IndexMany(数据))。使用批量我可以获得更好的性能,这对我的应用程序很重要。保存数据后,我知道它是否成功,并且我可以在 RabbitMQ 上确认或拒绝它们。问题是我只有在响应确认或拒绝后才能从 RabbitMQ 获取下一个数据。所以我不能再批量处理我的数据了。

您有解决该问题的想法吗?如何从 RabbitMQ 获取多个项目,处理它们并将响应(确认或拒绝)批量传递回 RabbitMQ?

rabbitmq
2个回答
1
投票

您使用预取多重确认的组合。


0
投票

感谢卢克。他的回答给了我所需的信息。 我在 Program.cs 中从

进行了更改
Model.BasicQos(0, 1, false);

至:

Model.BasicQos(0, NUMBER_OF_TIMESERIES_PER_BULK, false);

我添加一个监听器:

    static void TimeSeriesBusinessComponentDeliverResponse(object sender, DeliverEventArgs e)
    {
        e.ListDeliverTagsAcknoledge.ForEach(tag => { Model.BasicAck((ulong)tag, false); });
        e.ListDeliverTagReject.ForEach(tag => { Model.BasicReject((ulong)tag, false); });
    }

在 NUMBER_OF_TIMESERIES_PER_BULK 个数量之后,我将其保存到 Elastic, 看起来像这样:

        var responseSlimData = ElasticClientSlim.IndexMany(slimData);

        DeliverEventArgs dea = new DeliverEventArgs();
        if (responseSlimData.IsValid)
            dea.ListDeliverTagsAcknoledge = listDeliferyTags;
        else
            dea.ListDeliverTagReject = listDeliferyTags;
        OnDeliverResponse(dea);
        // .....

    public event EventHandler<DeliverEventArgs> DeliverResponse;
    protected virtual void OnDeliverResponse(DeliverEventArgs e)
    {
        DeliverResponse?.Invoke(this, e);
    }

我添加了我的 EventArgs 类

public class DeliverEventArgs : EventArgs
{
    public DeliverEventArgs()
    {
        ListDeliverTagsAcknoledge = new List<ulong?>();
        ListDeliverTagReject = new List<ulong?>();
    }
    public List<ulong?> ListDeliverTagsAcknoledge { get; set; }
    public List<ulong?> ListDeliverTagReject { get; set; }
}
© www.soinside.com 2019 - 2024. All rights reserved.