在 Apache Flink 中,如何捕获 Kafka Source Connector 抛出的异常?
我有一个用例,我在 flink 应用程序中连接的 kafka 集群之一变得不可用。如果我能捕获异常,我就可以用其他一些业务逻辑来处理它。
但是,看起来从 KafkaSource 构造的 DataStream 是延迟评估的。直到 keyBy 和 process 才会抛出错误。
当我像这样创建源时,有没有办法捕获异常?
KafkaSource<List<Event>> kafkaSource = KafkaSourceFactory.createKafkaSource(kafkaTopic, kafkaConfig, deserializationSchema);
return env.fromSource(kafkaSource, watermarkStrategy, uniqueSourceName + "_input")
.uid(uniqueSourceName + "_input")
Flink 将源分发到多个 TM,这就是为什么在远程执行工作流之前不会创建 Kafka 消费者。在构建工作流程时,您可以运行一些检查来查看该主题在您的主方法中是否可用,但从那时到工作流程实际开始执行之间仍然有一个时间窗口。
是否可以使用Failure Enricher(如果需要),然后在您的主要方法中应用业务逻辑?例如,您可以提交修改后的作业,以响应主题失败。