如何更新CQRS中读取模型中现有的丰富事件?

问题描述 投票:0回答:1

我使用Kafka Streams来处理用户数据的变化以及与用户操作相对应的事件。 我使用连接操作 (KStream-KTable) 丰富事件,然后将丰富的事件写入 Elasticsearch。

有一个问题:用户可以更改用户数据(电子邮件、姓名、姓氏等)。我们需要在 Elasticsearch 索引中更新此信息。

我发现唯一的方法是将事件存储在 KTable 中并与另一个 KTable 加入。不幸的是,这种方法会导致开销,因为流存储在状态存储中。

有没有更好的方法?或者也许这个任务是不可能的?

elasticsearch microservices apache-kafka-streams cqrs
1个回答
0
投票

使用 Kafka Streams 无法做到这一点。您必须实现自己的代理来侦听用户数据消息并更新 Elasticsearch 中的数据。

为了使这个代理具有弹性,我这样实现:

  • 收到消息
  • 无限循环
    • 在 ES 中搜索 100 条过时的数据(名字不是最新的或姓氏不是最新的或......)
    • 如果没有找到数据,则打破循环
    • 否则更新数据并批量更新到ES中
  • 提交消息

我这样做是因为可能有大量数据需要更新,而 Kafka 允许您在有限的时间内处理消息。如果消息处理时间过长,Kafka 无法提交消息,您的代理会再次接收该消息。由于所有数据均已更新,因此循环立即退出并提交消息。您也可以随时重新启动代理,它只会再次接收消息并恢复处理。

© www.soinside.com 2019 - 2024. All rights reserved.