Flink如何为运营商保存状态?

问题描述 投票:0回答:2

我在我的Flink工作中设置了checkpointing,它有2个滑动窗口(这些不是连接)和1个翻滚窗口连接。我的想法是,我真的不需要保存join本身的状态,因为保存2滑动窗口的状态就足够了。 Join最终是一个20-30gb状态导致工作滞后和崩溃,检查点永远不会结束保存。

我怎么能做到这一点?

我正在尝试这样的事情:

public class CustomJoin implements JoinFunction<A, A, A>, ListCheckPointed<A> {

@Override
public A join(A a, A b){
 // Some irrelevant join logic
}

@Override
    public List<A> snapshotState(long l, long l1) throws Exception {
      return new ArrayList<>();
    }

    @Override
    public void restoreState(List<A> list) throws Exception {

    }
}

这实际上是否避免存储加入状态?它叫做:

stream
.assignTimestampsAndWatermarks(...)
.join(secondStream.assingTimestampsAndWatermarks(...))
.where(KeySelector...)
.equalTo(KeySelector...)
.window(TumblingEventTimeWindows.of(Time.minutes(1L))
.trigger(EventTimeTrigger.create())
.apply(new CustomJoin());

这在实践中有效吗?什么是避免存储状态的最佳方法?

apache-flink flink-streaming flink-cep
2个回答
0
投票

根据我对Flink的理解,检查点需要确保整个计算能够安全有效地恢复,因此这种全局状态是不可避免的。但是Flink自己的检查点可以关闭(它基于ABS算法,它几乎没有性能损失,我不推荐它),但是使用Flink提供的SavePoint来定制快照,但Flink检查点是增量的。保存,SavePoint是一个完整的保存。我建议您查看以下材料:1,分布式快照 - 确定分布式系统的全局状态2,分布式数据流的轻量级异步快照3,https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/state/checkpointing.html我认为这可以很好地解决您的问题。


0
投票

在窗口连接中,JoinFunction由窗口操作符执行。它没有自己的状态。所以你所尝试的并不会有所帮助。

此外,滑动窗口使用的状态比您可能意识到的要多得多。每个重叠的实例都有自己的窗口内容副本。因此,例如,如果您有一小时长的窗口滑动1分钟,那么每个事件将被复制60次。

© www.soinside.com 2019 - 2024. All rights reserved.