如果Flink应用程序在发生故障或更新后正在启动,是否保留了不是明确属于KeyedState或OperatorState的类变量?
例如,Flink文档中描述的BoundedOutOfOrdernessGenerator具有currentMaxTimestamp变量。如果更新了Flink应用程序,那么会丢失currentMaxTimestamp中的值,还是将其写入在更新应用程序之前创建的保存点?
真正的原因是我想实现一个自定义水印生成器(similar to this),如果源闲置时间过长,则在生成水印时切换到处理时间。但是,我希望根据类变量重置为其原始默认值(例如,我在上面提供的链接中的示例中的Long.MIN_VALUE),在更新或失败后检测到应用程序重新联机。这样,我可以确保水印生成器不会错误地将花费五分钟的应用程序更新作为源处于空闲状态的五分钟。
此外,即使没有对水印生成器进行任何更改,如果更新了应用程序,Flink也会重新启动每个水印生成器操作员吗?
仅保留由Flink明确管理的状态-因此,是的,从快照还原时,currentMaxTimestamp
中的值会丢失。当前的水印也不包含在快照中。
我认为您可以做的-尽管我还没有尝试过-将使您的水印生成器实现CheckpointedFunction
接口。然后,您可以实现这两种方法:
public void snapshotState(FunctionSnapshotContext context)
public void initializeState(FunctionInitializationContext context)
在initializeState
方法中,您可以访问context.isRestored()
,它使您知道是否要从快照重新启动。