如何在使用 Kafka Stream 从 RTopic 读取数据时对外部系统进行 rest 调用

问题描述 投票:0回答:0

主要需求是从topic中读取数据,通过rest API向外部系统发送数据。还有一个要求是按照同样的顺序将消息发送到目标系统。

示例:下面两条消息应该以相同的顺序传递到目标系统(通过 rest API)。如果第一条消息未能发送到目标系统,例如 400 个错误请求,则不应将同一客户的第二条消息(如下所述)发送到目标系统,并且两条消息都应推送到死信主题以供以后处理.所有其他记录应照常挑选。

第一条消息:客户 C1 -> 地址更新 -> 地址 1 第二条消息:客户 C1 -> 地址更新 -> 地址 2

这里有两个问题:

  1. 如何处理 400 错误请求的失败场景
  2. 如何将 REST 调用与 Kafka 流分离,否则吞吐量将减少,因为 REST 调用可能需要 200 毫秒,并且还有重试机制。

使用状态存储解决问题1的方法:

  1. 阅读主题并检查状态存储中是否存在密钥。
  2. 如果存在则将当前记录移动到 DL 主题并选择下一条记录。
  3. 如果不存在,请拨打外部休息电话。如果发生错误,将记录添加到状态存储中,并将当前数据也推送到 DL 主题中。 所以,在这里我利用状态存储来解决第一个问题。

问题 2 的解决方法 -> 可以想到 KTable,但问题是 Ktable 将始终拥有最新记录,而不是同一客户的所有记录。所以所有消息都不会传递到目标系统。

如何解决问题 2 或解决问题 1 的更好方法?

apache-kafka-streams confluent-platform
© www.soinside.com 2019 - 2024. All rights reserved.