我在我的Flink工作中设置了checkpointing
,它有2个滑动窗口(这些不是连接)和1个翻滚窗口连接。我的想法是,我真的不需要保存join
本身的状态,因为保存2
滑动窗口的状态就足够了。 Join
最终是一个20-30gb状态导致工作滞后和崩溃,检查点永远不会结束保存。
我怎么能做到这一点?
我正在尝试这样的事情:
public class CustomJoin implements JoinFunction<A, A, A>, ListCheckPointed<A> {
@Override
public A join(A a, A b){
// Some irrelevant join logic
}
@Override
public List<A> snapshotState(long l, long l1) throws Exception {
return new ArrayList<>();
}
@Override
public void restoreState(List<A> list) throws Exception {
}
}
这实际上是否避免存储加入状态?它叫做:
stream
.assignTimestampsAndWatermarks(...)
.join(secondStream.assingTimestampsAndWatermarks(...))
.where(KeySelector...)
.equalTo(KeySelector...)
.window(TumblingEventTimeWindows.of(Time.minutes(1L))
.trigger(EventTimeTrigger.create())
.apply(new CustomJoin());
这在实践中有效吗?什么是避免存储状态的最佳方法?
根据我对Flink的理解,检查点需要确保整个计算能够安全有效地恢复,因此这种全局状态是不可避免的。但是Flink自己的检查点可以关闭(它基于ABS算法,它几乎没有性能损失,我不推荐它),但是使用Flink提供的SavePoint来定制快照,但Flink检查点是增量的。保存,SavePoint是一个完整的保存。我建议您查看以下材料:1,分布式快照 - 确定分布式系统的全局状态2,分布式数据流的轻量级异步快照3,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html我认为这可以很好地解决您的问题。
在窗口连接中,JoinFunction由窗口操作符执行。它没有自己的状态。所以你所尝试的并不会有所帮助。
此外,滑动窗口使用的状态比您可能意识到的要多得多。每个重叠的实例都有自己的窗口内容副本。因此,例如,如果您有一小时长的窗口滑动1分钟,那么每个事件将被复制60次。