我希望在一个系统中实现事务发件箱模式,该系统使用数据库表作为与 Kafka 配合的事务发件箱,以保证我的消息恰好一次交付到 Kafka。之后,我使用 Kafka Streams,它被设置为启用恰好一次处理。因此,该系统应该保证每个客户端消息都被准确地传递和处理一次。
本例中的系统如下所示:
Client -> API -> DB -> Kafka
。
现在我明白了,轮询发布者的基本事务发件箱模式如下:
将传入消息写入同一数据库事务中的
message
表和 message_outbox
表中。
messageId
)。messageId
的具有相同 Client -> API -> DB
的传入消息不会在 API
处被重新处理,因为 Client
message
的使用者可以检查 message
表并忽略重复项。 使用轮询发布者模式,作为单独的专用 CDC 服务或作为
API
服务中的“发布者”线程,轮询新消息。
获取分布式锁(例如使用ShedLock)。
更新一批
message_outbox
事件以设置“锁定”状态:
UPDATE message_outbox SET locked = true, locked_by = $self_instance WHERE message_id IN (SELECT message_id FROM message_outbox WHERE locked = false OR locked_by = $self_instance ORDER BY created_at)
获取获取的批次
SELECT * FROM message_outbox WHERE locked = true AND locked_by = $self_instance ORDER BY created_at
获取一批
message
事件的锁后,使用幂等生产者将记录生成到Kafka。
完成
我在很多博客、文章和其他答案中读过这个近似的实现草图。但是,我无法理解这如何在所有情况下支持“一次性交付”。我可能在这个过程中遗漏了一些关键的技术点。 例如,假设在步骤
3中,生产者生成了 Kafka 记录,但未能从 Kafka 接收到 ACK。因此,message
事件不会从发件箱表中删除(即不是
sent
)并且将重试。对于同一会话中的幂等生产者来说,这没有问题 - 在下一次计划运行时,它将重试,并且防护令牌将确保不写入重复项。但是存在这样的情况:在收到 ACK 之前,生产者完全“失败”而无法恢复(即应用程序收到 OOM)。在这种情况下:
message
message
message_outbox
事件,并且仍然会重试发送该事件。据我所知,即使对于幂等和事务性生产者,这也只能保证至少一次交付。
据我所知,即使使用外部 CDC 服务或 Debezium 之类的服务,这个问题仍然存在 - CDC 服务可能会在确认事件已处理之前崩溃。因此,即使在这些情况下,也无法保证一次性交付。我错过了什么?您似乎忘记考虑 Kafka Producer 事务,但首先要注意的是——恰好一次
处理
交付不相同。 Kafka 只能保证持久写入的事件可以被消费一次。 注意:“已交付”与您的数据库客户端无关,“已处理”与您的实际消费者逻辑或偏移提交无关 你的担忧是有道理的,但这并不是 Kafka API 真正关心的事情