Flink 如何处理 Kafka 连接器故障?

问题描述 投票:0回答:1

在 Apache Flink 中,如何捕获 Kafka Source Connector 抛出的异常?

我有一个用例,我在 flink 应用程序中连接的 kafka 集群之一变得不可用。如果我能捕获异常,我就可以用其他一些业务逻辑来处理它。

但是,看起来从 KafkaSource 构造的 DataStream 是延迟评估的。直到 keyBy 和 process 才会抛出错误。

当我像这样创建源时,有没有办法捕获异常?

KafkaSource<List<Event>> kafkaSource = KafkaSourceFactory.createKafkaSource(kafkaTopic, kafkaConfig, deserializationSchema);

return env.fromSource(kafkaSource, watermarkStrategy, uniqueSourceName + "_input")
            .uid(uniqueSourceName + "_input")
apache-flink
1个回答
0
投票

Flink 将源分发到多个 TM,这就是为什么在远程执行工作流之前不会创建 Kafka 消费者。在构建工作流程时,您可以运行一些检查来查看该主题在您的主方法中是否可用,但从那时到工作流程实际开始执行之间仍然有一个时间窗口。

是否可以使用Failure Enricher(如果需要),然后在您的主要方法中应用业务逻辑?例如,您可以提交修改后的作业,以响应主题失败。

© www.soinside.com 2019 - 2024. All rights reserved.