为 Apache Flink 应用程序提供初始状态

问题描述 投票:0回答:1

我正在使用 Apache Flink 编写一个应用程序来替代旧的应用程序。旧应用程序接收来自不同源的事件,并用于监视源的事件是否满足特定标准。 (例如传感器值超过阈值)。
如果在给定时间段(几天到几周)内超过阈值,它会报告这些来源

新应用程序将执行相同的操作,使用两个 kafka 主题(事件数据和配置数据),并在超出受监控源的时间段(使用计时器服务)时向第三个主题写入警报。
生成的图表是 2 个源主题(会议、事件)到 KeyedCoProcessFunction 到接收器主题(警报)

我正在寻找将所有受监控源从旧应用程序迁移到 Apache Flink 键控状态的可能性。
源文件将导出为包含初始化新应用程序所需的所有信息的文件。

我编写了第二个应用程序,其中的图表相似但不相同。 -> 1 个主题源(配置)、1 个集合源(使用文件读取器提供所有受监控的源)和一个具有与替换相同的状态定义的 KeyedCoProcess 函数。

想法是:

  1. 启动初始化应用程序,让它完全使用配置主题并保存相应的状态,并浏览旧的受监控源列表,将它们保存在其状态中(在 KeyedCoProcessFunction 中)。
  2. 手动停止创建保存点的初始化程序应用程序。
  3. 使用保存点启动正确的 Flink 应用程序,从而提供完整的状态。

我对配置源和 KeyedCoProcessFunction 使用相同的 uid。保存点的状态仍然没有正确摄取。

Skipping empty savepoint state for operator 1db5b24325c47f4692485c6f3204cb3b.  // new event source
Reset the checkpoint ID of job d92e6e970306ea6465b0f253beddebbe to 6.
Restoring job d92e6e970306ea6465b0f253beddebbe from Savepoint 5 @ 0 for d92e6e970306ea6465b0f253beddebbe located at file:/tmp/savepoint-05ed46-60bd264d64df.
No master state to restore
Resetting coordinator to checkpoint.
Closing SourceCoordinator for source Source: Configs.
Source coordinator for source Source: Configs closed.
Restoring SplitEnumerator of source Source: Configs from checkpoint.
Resetting coordinator to checkpoint.
Closing SourceCoordinator for source Source: Events.
Source coordinator for source Source: Events closed.
Using failover strategy org.apache.flink.runtime.executiongraph.failover.flip1.RestartPipelinedRegionFailoverStrategy@1bd0b5ba for Monitor_v1.0 (d92e6e970306ea6465b0f253beddebbe).
Starting execution of job 'Monitor_v1.0' (d92e6e970306ea6465b0f253beddebbe) under job master id 00000000000000000000000000000000.

有没有人面临同样的挑战或知道我如何应对这种情况?

非常感谢 BR M

apache-flink flink-streaming data-migration flink-state
1个回答
0
投票

我们能够找到状态未正确恢复的根本问题。

在运算符和接收器都通过 CoProcessFunction 连接的代码块中,我只向接收器提供了 uid/名称,而不是函数本身。由于为初始化程序应用程序构建的图与应接管的图不同,因此 flink 无法正确映射状态。

使用

env.disableOperatorChaining();
通过单独显示每个节点来提供对情况的一些额外见解。

sensorStreamOperator
    .keyBy(SensorEvent::getMachineId)
    .connect(configStreamOperator.keyBy(MachineConfig::getMachineId))
    .process(new HandleEventDataCoProcess(outputTag)
    .uid("CoProcessUID")  // Provide separate uid to the processor!
    .name("CoProcessFunction")
    .getSideOutput(outputTag)
    .sinkTo(outlierSink)
    .uid("OutlierSinkUID")
    .name("OutlierSink");
© www.soinside.com 2019 - 2024. All rights reserved.