我正在使用 Apache Flink 编写一个应用程序来替代旧的应用程序。旧应用程序接收来自不同源的事件,并用于监视源的事件是否满足特定标准。 (例如传感器值超过阈值)。
如果在给定时间段(几天到几周)内超过阈值,它会报告这些来源
新应用程序将执行相同的操作,使用两个 kafka 主题(事件数据和配置数据),并在超出受监控源的时间段(使用计时器服务)时向第三个主题写入警报。
生成的图表是 2 个源主题(会议、事件)到 KeyedCoProcessFunction 到接收器主题(警报)
我正在寻找将所有受监控源从旧应用程序迁移到 Apache Flink 键控状态的可能性。
源文件将导出为包含初始化新应用程序所需的所有信息的文件。
我编写了第二个应用程序,其中的图表相似但不相同。 -> 1 个主题源(配置)、1 个集合源(使用文件读取器提供所有受监控的源)和一个具有与替换相同的状态定义的 KeyedCoProcess 函数。
想法是:
我对配置源和 KeyedCoProcessFunction 使用相同的 uid。保存点的状态仍然没有正确摄取。
Skipping empty savepoint state for operator 1db5b24325c47f4692485c6f3204cb3b. // new event source
Reset the checkpoint ID of job d92e6e970306ea6465b0f253beddebbe to 6.
Restoring job d92e6e970306ea6465b0f253beddebbe from Savepoint 5 @ 0 for d92e6e970306ea6465b0f253beddebbe located at file:/tmp/savepoint-05ed46-60bd264d64df.
No master state to restore
Resetting coordinator to checkpoint.
Closing SourceCoordinator for source Source: Configs.
Source coordinator for source Source: Configs closed.
Restoring SplitEnumerator of source Source: Configs from checkpoint.
Resetting coordinator to checkpoint.
Closing SourceCoordinator for source Source: Events.
Source coordinator for source Source: Events closed.
Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@1bd0b5ba for Monitor_v1.0 (d92e6e970306ea6465b0f253beddebbe).
Starting execution of job 'Monitor_v1.0' (d92e6e970306ea6465b0f253beddebbe) under job master id 00000000000000000000000000000000.
有没有人面临同样的挑战或知道我如何应对这种情况?
非常感谢 BR M
我们能够找到状态未正确恢复的根本问题。
在运算符和接收器都通过 CoProcessFunction 连接的代码块中,我只向接收器提供了 uid/名称,而不是函数本身。由于为初始化程序应用程序构建的图与应接管的图不同,因此 flink 无法正确映射状态。
使用
env.disableOperatorChaining();
通过单独显示每个节点来提供对情况的一些额外见解。
sensorStreamOperator
.keyBy(SensorEvent::getMachineId)
.connect(configStreamOperator.keyBy(MachineConfig::getMachineId))
.process(new HandleEventDataCoProcess(outputTag)
.uid("CoProcessUID") // Provide separate uid to the processor!
.name("CoProcessFunction")
.getSideOutput(outputTag)
.sinkTo(outlierSink)
.uid("OutlierSinkUID")
.name("OutlierSink");