我有一个 RabbitMQ 监听器,从那里我处理传入的数据,将它们收集到 7500 个项目的大小,并将它们作为批量保存在 Elasticsearch 上(ElasticClient.IndexMany(数据))。使用批量我可以获得更好的性能,这对我的应用程序很重要。保存数据后,我知道它是否成功,并且我可以在 RabbitMQ 上确认或拒绝它们。问题是我只有在响应确认或拒绝后才能从 RabbitMQ 获取下一个数据。所以我不能再批量处理我的数据了。
您有解决该问题的想法吗?如何从 RabbitMQ 获取多个项目,处理它们并将响应(确认或拒绝)批量传递回 RabbitMQ?
感谢卢克。他的回答给了我所需的信息。 我在 Program.cs 中从
进行了更改Model.BasicQos(0, 1, false);
至:
Model.BasicQos(0, NUMBER_OF_TIMESERIES_PER_BULK, false);
我添加一个监听器:
static void TimeSeriesBusinessComponentDeliverResponse(object sender, DeliverEventArgs e)
{
e.ListDeliverTagsAcknoledge.ForEach(tag => { Model.BasicAck((ulong)tag, false); });
e.ListDeliverTagReject.ForEach(tag => { Model.BasicReject((ulong)tag, false); });
}
在 NUMBER_OF_TIMESERIES_PER_BULK 个数量之后,我将其保存到 Elastic, 看起来像这样:
var responseSlimData = ElasticClientSlim.IndexMany(slimData);
DeliverEventArgs dea = new DeliverEventArgs();
if (responseSlimData.IsValid)
dea.ListDeliverTagsAcknoledge = listDeliferyTags;
else
dea.ListDeliverTagReject = listDeliferyTags;
OnDeliverResponse(dea);
// .....
public event EventHandler<DeliverEventArgs> DeliverResponse;
protected virtual void OnDeliverResponse(DeliverEventArgs e)
{
DeliverResponse?.Invoke(this, e);
}
我添加了我的 EventArgs 类
public class DeliverEventArgs : EventArgs
{
public DeliverEventArgs()
{
ListDeliverTagsAcknoledge = new List<ulong?>();
ListDeliverTagReject = new List<ulong?>();
}
public List<ulong?> ListDeliverTagsAcknoledge { get; set; }
public List<ulong?> ListDeliverTagReject { get; set; }
}