[基本上,我的消费者也是生产者。我们得到一个初始数据集,并将其发送到队列。消费者拿走一件商品并对其进行处理,从那时起,共有3种可能性:
我的问题在于第3步,因为队列最初增长非常快,有可能将一条数据分解成在队列中重复的部分,而使用者继续处理它并最终陷入无限循环。
我认为防止这种情况的方法是防止重复项进入队列。我无法在客户端执行此操作,因为在一个小时的过程中,我可能有许多核心处理数十亿个数据点(让每个客户端在提交之前对其进行扫描会使我的速度降低太多)。我认为这需要在服务器端完成,但是,就像我提到的那样,数据非常大,我不知道如何有效地确保没有重复。
我可能会问不可能的事,但以为我会试一试。任何想法将不胜感激。
核心问题似乎是这样:
"...its possible that a piece of data is broken down into a part that's
duplicated in the queue and the consumers continue to process it and
end up in a infinite loop."
您可以将所有想要的焦点放在排队项目的唯一性上,但是,IMO是上面的问题。防止无限循环的一种方法可能是在消息有效负载中包含一个“已访问”位,该位由消费者在重新排队该细分项之前设置。
另一种选择是让使用者重新排队回到一个特殊的队列,该队列的处理方式略有不同,以防止无限循环。无论哪种方式,您都应该通过将其作为应用程序策略的核心部分来解决该问题,而不是使用消息传递系统的功能来解决它。
我认为即使您可以解决不将重复数据发送到队列的问题,也迟早会遇到此问题:
有一个rabbitmq插件,使您可以通过一些附加头进行这种类型的控制。