用例是——一个从不断变化的主题列表中读取、处理它们并写入不同的 Kafka 主题的管道。 所以目前有 150 个 Kafka 主题,但这个列表可以改变。 有一些 API 可以返回当前主题集(可以添加和删除主题)。我想每分钟都捕捉到这种变化(但它一天只发生几次)。
使用 Flink,我可以使用 Dynamic Partition Discovery,间隔为 1 分钟 - 这似乎回答了用例。
但是如何更改我订阅的主题集(无需重新启动 job ofc)?
val topics = List("a", "b", "c")
KafkaSource.builder[util.HashMap[String, Object]]
.setBootstrapServers(kafkaSourceBootstrapServers)
.setGroupId(kafkaJobConsumerGroup)
.setTopics(topics:_*)
....
.build
上面返回一个
KafkaSource[T]
,但我想每分钟更改一次topics
列表(基本上从一些API获取topics
值)。
我该怎么做?
似乎我唯一可以使用的 API 是:
env.fromSource()
env.addSource()
但这会创建一个不同的
DataStream[T]
,我已经在运行流媒体了。
如何在作业仍在运行时更改主题列表?或者不可能,我也逃不过重启。
唯一可行的方法是使用主题模式:
KafkaSource.builder().setTopicPattern("topic.*");
但这是否会满足您的要求取决于您是否可以安排让您想要消费的主题遵循一些可预测的命名模式。