我正在尝试通过在本地联接两个DataStream来通过Flink运行基本联接。源流的数据类型相同(Tuple4(String,String,Long,Long))。多次运行下面提到的函数后,我随机接收到两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到相同的DEBUG日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。
//Basic Function
public void runBasicJoin() throws Exception {
TumblingEventTimeWindows tsAssigner;
//tried with getExecutionEnvironment as well
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
env.setParallelism(1);
env.setMaxParallelism(1);
//class declared below
CollectTuple2Sink.VALUES.clear();
Tuple4<String, String, Long, Long> input1 =
new Tuple4<String, String, Long, Long>("key1", "val1", 1L, t(1));
Tuple4<String, String, Long, Long> input2 =
new Tuple4<String, String, Long, Long>("key1", "val2", 12L, t(2));
Tuple4<String, String, Long, Long> input3 =
new Tuple4<String, String, Long, Long>("key1", "val3", 3L, t(3));
Tuple4<String, String, Long, Long> input4 =
new Tuple4<String, String, Long, Long>("key2", "val4", 18L, t(4));
Tuple4<String, String, Long, Long> input5 =
new Tuple4<String, String, Long, Long>("key1", "val5", 11L, t(6));
Tuple4<String, String, Long, Long> input6 =
new Tuple4<String, String, Long, Long>("key1", "val6", -121L, t(7));
Tuple4<String, String, Long, Long> input7 =
new Tuple4<String, String, Long, Long>("key2", "val7", -111L, t(8));
Tuple4<String, String, Long, Long> input8 =
new Tuple4<String, String, Long, Long>("key2", "val8", 111L, t(9));
@SuppressWarnings("unchecked")
DataStream<Tuple4<String, String, Long, Long>> dataStream1 = env.addSource(new Tuple4Soruce(
t(0), input1, input2, input3, input4,t(5),
input5, input6, input7, input8,t(10)
));
dataStream1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
return tuple4.f3;
}
});
@SuppressWarnings("unchecked")
DataStream<Tuple4<String, String, Long, Long>> dataStream2 = env.addSource(new Tuple4Soruce(
t(0), input1, input3,input3,input4,input4,input4,t(5),
input5,input6, t(10),t(11)
));
dataStream2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
@Override
public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
return tuple4.f3;
}
});
tsAssigner = TumblingEventTimeWindows.of(Time.minutes(5));
dataStream1.join(dataStream2)
.where(new Tuple4KeySelector())
.equalTo(new Tuple4KeySelector())
.window(tsAssigner)
.trigger(EventTimeTrigger.create())
.evictor(CountEvictor.of(2))
.apply(new Tuple4JoinFunction())
.addSink(new CollectTuple2Sink());
env.execute();
System.out.println(CollectTuple2Sink.VALUES);
}
private static class CollectTuple2Sink
implements SinkFunction<Tuple2<String, Long>> {
public static final List<Tuple2<String, Long>> VALUES = new ArrayList<>();
@Override
public synchronized void invoke(Tuple2<String, Long> value)
throws Exception {
VALUES.add(value);
}
}
//join function ---> Takes the 2nd and 4th field of a tuple and convert tuple4 to tuple2
private static class Tuple4JoinFunction implements JoinFunction<Tuple4<String, String, Long, Long>, Tuple4<String, String, Long, Long>, Tuple2<String, Long>> {
@Override
public Tuple2<String, Long> join(Tuple4<String, String, Long, Long> tuple4, Tuple4<String, String, Long, Long> tuple42) throws Exception {
return new Tuple2<>(tuple4.f1, tuple4.f3);
}
}
//key selector --> select the 2nd value of tuple 4
private static class Tuple4KeySelector implements KeySelector<Tuple4<String, String, Long, Long>, String> {
@Override
public String getKey(Tuple4<String, String, Long, Long> tuple4) throws Exception {
return tuple4.f1;
}
}
//source function --> generates a sequence input for tuple4
private static class Tuple4Soruce
implements SourceFunction, ResultTypeQueryable<Tuple4<String, String, Long, Long>> {
private volatile boolean running = true;
private Object[] testStream;
private TypeInformation<Tuple4<String, String, Long, Long>> typeInformation =
TypeInformation.of(new TypeHint<Tuple4<String, String, Long, Long>>() {
});
Tuple4Soruce(Object... eventsOrWatermarks) {
this.testStream = eventsOrWatermarks;
}
@Override
public void run(SourceContext ctx) throws Exception {
for (int i = 0; (i < testStream.length) && running; i++) {
if (testStream[i] instanceof Tuple4) {
Tuple4<String, String, Long, Long> tuple =
(Tuple4<String, String, Long, Long>) testStream[i];
ctx.collectWithTimestamp(tuple, tuple.f3);
} else if (testStream[i] instanceof Long) {
Long ts = (Long) testStream[i];
ctx.emitWatermark(new Watermark(ts));
} else {
throw new RuntimeException(testStream[i].toString());
}
}
}
@Override
public void cancel() {
running = false;
}
@Override
public TypeInformation<Tuple4<String, String, Long, Long>> getProducedType() {
return typeInformation;
}
}
//util function to generate time
public long t(int n) {
return new DateTime(2018, 1, 1, 0, 0).plusMinutes(n).getMillis();
}
运行1的日志:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1914335182] with leader session id 2a8bf59e-01fa-4e67-892c-83b10dd65be1.
01/09/2020 00:50:16 Job execution switched to status RUNNING.
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED
01/09/2020 00:50:16 Job execution switched to status FINISHED.
[(val1,1514745060000), (val5,1514745360000), (val6,1514745420000)]
Run2的日志:
Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1448653751] with leader session id 291df2cb-96fd-4e3c-b46c-911d2ca11905.
01/09/2020 00:49:42 Job execution switched to status RUNNING.
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED
01/09/2020 00:49:42 Job execution switched to status FINISHED.
[(val1,1514745060000), (val3,1514745180000), (val4,1514745240000), (val5,1514745360000), (val6,1514745420000)]
源函数和其他定义均来自this教程。还探讨了Flink官方文档中有无驱逐者时执行基本工作的多种方法。在没有驱逐者的情况下测试了多件事情,所有运行的输出均符合预期。一旦驱逐者出现,情况开始变得不确定。
Flink版本1.4.2
您尚未共享所有代码,但是据我所知,正在发生的事情是结果取决于摄取顺序-例如,基于计数的窗口就是这种情况-在这种情况下,您将无法获得确定的结果。
窗口连接正在从两个输入流读取,并且将在每个流中按顺序处理事件,但两个流将以不确定性和不可控制的方式相互竞争。当且仅当窗口触发和处理仅基于事件时间时,结果才是确定的。如果涉及计数或处理时间,那么您就不能期望产生确定性的结果。