确定地基于另一个流的Flink筛选器流

问题描述 投票:0回答:2

我在Flink中有2个数据流(具有常见的时间戳记,来自Kafka),其中一个包含一些信号值,另一个包含活动性(简单的活动-非活动)信息。我用简单的状态RichCoProcessFunction尝试了private ValueState<Boolean> seen;,结果是不确定的。如果我使用startFromEarliest对同一组数据(具有相同的时间戳)运行,有时会过滤掉不同的值。我该如何确定性?我在下面共享我的KeyedCoProcessFunction骨架。

private ValueState < Boolean > seen;

@Override
public void open(Configuration parameters) throws Exception {
    ValueStateDescriptor < Boolean > descriptor = new ValueStateDescriptor < > (
        // state name
        "have-seen-key",
        // type information of state
        TypeInformation.of(new TypeHint < Boolean > () {}));
    seen = getRuntimeContext().getState(descriptor);
}

@Override
public void processElement1(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (seen.value() == Boolean.TRUE) {
        out.collect(value);
    }
}

@Override
public void processElement2(SomeEvent < Double > value, Context ctx, Collector < SomeEvent < Double >> out) throws Exception {
    if (value.value == 1) {
        seen.update(Boolean.TRUE);

    } else {
        seen.update(Boolean.FALSE);
    }

}
java stream apache-flink
2个回答
0
投票

之所以不确定,是因为这两个来源产生的步调不同。使它更具确定性的最简单方法是使用EventTime。这意味着您需要同时为控制记录和数据记录分配时间戳。 Flink然后将为您的元素发出水印。

然后,您可以简单地缓冲并等待发射或丢弃元素,直到您收到控制流的水印为止,这意味着控制流中没有任何变化。

没有时间戳,在这种情况下几乎不可能引入确定性行为,因为您将永远无法准确分辨出给定记录何时到达,哪些记录应该被删除以及哪些记录应该被发出。


0
投票

实现所需类型的事件时间联接可以作为RichCoProcessFunction完成,但可能有点复杂。您可能更喜欢将其实现为join with a temporal table function

© www.soinside.com 2019 - 2024. All rights reserved.