我们的团队正在使用 Kafka Streams 版本
3.6.0
。我们正在尝试在一些外部管理的代理上运行 Kafka Streams 拓扑。当 Kafka Streams 应用程序尝试创建其内部变更日志主题时,我收到以下错误。
org.apache.kafka.streams.errors.StreamsException: Could not create topic <application.id>-KSTREAM-OUTEROTHER-0000000007-store-changelog.
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getTopicPartitionInfo(InternalTopicManager.java:623)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.getNumPartitions(InternalTopicManager.java:641)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.validateTopics(InternalTopicManager.java:657)
at org.apache.kafka.streams.processor.internals.InternalTopicManager.makeReady(InternalTopicManager.java:472)
at org.apache.kafka.streams.processor.internals.ChangelogTopics.setup(ChangelogTopics.java:97)
...
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1159)
at org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:1014)
at org.apache.kafka.streams.processor.internals.StreamThread.pollPhase(StreamThread.java:962)
at org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:766)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:617)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:579)
如果我们没有办法授予底层 Kafka 客户端管理员权限...... 我们可以为 Kafka Streams 内部变更日志主题提供一个稳定的名称吗? 设置主题前缀似乎并不能完全解决问题,因为我们有一些类似
testapplication-KSTREAM-REDUCE-STATE-STORE-0000000008-repartition
的内部主题。
之前的研究
只是关注@AyoubOmari 的评论。
A.
“您正在两个 KStream 之间进行连接,对吗?”
正确,是的。
“我认为这可以在 KStream.join() 的 StreamJoined 参数中配置。我相信它是 StreamJoined 中的字段 storeName。”
这是 org.apache.kafka.streams.KafkaStreams 的构造函数。
这些是它的配置属性。而且我不知道哪里可以通过
storeName
。
我们正在使用 org.apache.kafka.streams.kstream.KStream/outerJoin。我没有找到一种方法来传递
storeName
。
B. KafkaStreams - Streams Client 谈论
org.apache.kafka.streams.KafkaStreams
的内部结构。
但是 Stores Utility — Factory of State Stores 还研究了 org.apache.kafka.streams.state.Stores 的内部结构。也许有一种方法可以强制更改日志的名称一致?
C. 如果我们无法为
changelog
强制执行预定名称,则存在潜在的备份计划...由于 changelog
名称似乎在拓扑重新启动后保持稳定,因此初始部署可以包括首先手动手动设置 changelog
主题的操作创建的。然后,随后启动 Kafka 流拓扑,期望 changelog
名称与手动创建的名称相匹配。这是一个丑陋的解决方案,但如果我们没有能力动态创建主题,这是我们唯一可用的解决方案。
但这引出了核心问题......对于相同的主题和配置输入,
changelog
名称在拓扑重新启动后会保持不变(稳定)吗?