我使用Kafka Streams来处理用户数据的变化以及与用户操作相对应的事件。 我使用连接操作 (KStream-KTable) 丰富事件,然后将丰富的事件写入 Elasticsearch。
有一个问题:用户可以更改用户数据(电子邮件、姓名、姓氏等)。我们需要在 Elasticsearch 索引中更新此信息。
我发现唯一的方法是将事件存储在 KTable 中并与另一个 KTable 加入。不幸的是,这种方法会导致开销,因为流存储在状态存储中。
有没有更好的方法?或者也许这个任务是不可能的?
使用 Kafka Streams 无法做到这一点。您必须实现自己的代理来侦听用户数据消息并更新 Elasticsearch 中的数据。
为了使这个代理具有弹性,我这样实现:
我这样做是因为可能有大量数据需要更新,而 Kafka 允许您在有限的时间内处理消息。如果消息处理时间过长,Kafka 无法提交消息,您的代理会再次接收该消息。由于所有数据均已更新,因此循环立即退出并提交消息。您也可以随时重新启动代理,它只会再次接收消息并恢复处理。