技术堆栈 - Python、Rabbitmq、Elasticsearch、Heroku
我有一个应用程序,它根据特定的时间表在应用程序中添加内容,添加内容时,需要根据某些标准向某些用户触发一封电子邮件(可能有大约 100 万用户)
我决定的设计是第一个应用程序(生产者)将发布内容,然后在 Rabbitmq(消息代理)中放入一条消息,其中包含每个应该收到电子邮件的用户的用户 ID 和电子邮件地址,所以基本上添加了大约 100 万个消息到 Rabbitmq,然后电子邮件将通过电子邮件应用程序(消费者)发送
我的问题是围绕生产者应用程序的。它将从elasticsearch获取用户ID列表,并开始使用for循环将它们添加到队列中,但是如果由于某种原因该应用程序出现故障(例如新部署的情况) ,那么只有少数用户会被添加到队列中,当应用程序返回时,它将再次开始排队,我们可能最终会向同一用户发送重复的电子邮件。 有没有一种标准方法可以避免这种情况。似乎是一个非常常见的问题。
我正在考虑在我的生产者应用程序数据库中维护消费者应用程序的消息确认(针对给定内容的每个用户)。但感觉在发送几封电子邮件后,这可能会导致我的 PostgreSQL 数据库中的记录爆炸。每个内容都可以随后通过电子邮件触发约 100 万用户。
你是对的,这是一个非常普遍的问题。 不幸的是,一般来说,当在异步执行的进程之间发送消息时,您不能保证消息的精确一次传递:您必须在最多一次和至少一次之间进行选择。
您可以通过让生产者应用程序永远不会重试发送(即使生产者失败)来保证最多一次(这可以轻松防止重复)。 然而,这可能不是您想要的。
您可以通过让消费者向生产者确认它已收到并根据发送的消息采取行动来保证至少一次;生产者维护给定消息尚未被确认的状态(例如在数据库中),并在一段时间后重试未确认的消息。 请注意,如果消费者在执行操作后确认,则操作(即发送的电子邮件)可能至少部分重复(考虑如果消费者在开始做某事和确认它已完成之间崩溃会发生什么)并且如果消费者在执行操作之前确认,您已将其纳入至多一次交付(考虑消费者在确认后但在完全执行该操作之前崩溃的情况)。
通过让消费者维护包含其已确认消息的状态,消费者可以删除重复消息:如果它收到一条已确认的消息,它会再次确认该消息并且不执行任何其他操作。 它仍然是至少一次,但重复率大大降低(需要付出一定的代价:每条消息现在可能需要 3 次 DB 写入(生产者写入未确认的消息,消费者和生产者每次写入确认),尽管这些不必是相同的数据库。
您肯定希望实现完全一致性,并且可能需要使用支持同步获取/选择的数据库(持久存储)来扩展堆栈。
作为“SWC-DB”的开发者,我欢迎您尝试最新的SWC-DB版本,解决了问题#1和#2,使一致性成为可能。有一个priority-queue的例子queue-example.cc(其他语言可以用特定的thrift-client实现) 您可能需要注意“queue-example.cc”的扫描规范中的
.set_opt__deleting()
:
SWC::DB::Specs::Interval intval;
intval.set_opt__deleting();
尽管如此,我可以看到“重复电子邮件”的其他问题,当电子邮件已发送且新任务到达时也可能发生。正如 @levi-ramsey 提到的,跟踪消费者的处理状态。 在这种情况下,SELECT 可以而且应该具有
UPDATE=(ns+3d,"SET_A_VALUE_AS_PROCESSED")
规范,并且在这种情况下(数据库中保留有许多记录)列的架构可以使用 TTL 设置(假设 3 天,所以尚未处理的也不会被删除)。
然而,间隔扫描规范要求对选择匹配的记录使用非 (-ne)“SET_A_VALUE_AS_PROCESSED”的值规范。有SWC-DB
条件-值-表达式语法解释了 SQL 的用法。
技术堆栈 - Python、Rabbitmq、Elasticsearch、Heroku
我有一个应用程序,它根据特定的时间表在应用程序中添加内容,添加内容时,需要根据某些标准向某些用户触发一封电子邮件(可能有大约 100 万用户)