Flink连接的带退出者的流中的不同输出

问题描述 投票:0回答:1

我正在尝试通过在本地联接两个DataStream来通过Flink运行基本联接。源流的数据类型相同(Tuple4(String,String,Long,Long))。多次运行下面提到的函数后,我随机接收到两个不同的输出(存储在下面的变量CollectTuple2Sink中,下面提到相同的DEBUG日志)。我尝试保持并行度1和最大并行度1,但问题仍然存在。

//Basic Function
    public void runBasicJoin() throws Exception {

        TumblingEventTimeWindows tsAssigner;
        //tried with getExecutionEnvironment as well
        StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironment();
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        env.setParallelism(1);
        env.setMaxParallelism(1);
        //class declared below
        CollectTuple2Sink.VALUES.clear();

        Tuple4<String, String, Long, Long> input1 =
                new Tuple4<String, String, Long, Long>("key1", "val1", 1L, t(1));
        Tuple4<String, String, Long, Long> input2 =
                new Tuple4<String, String, Long, Long>("key1", "val2", 12L, t(2));
        Tuple4<String, String, Long, Long> input3 =
                new Tuple4<String, String, Long, Long>("key1", "val3", 3L, t(3));
        Tuple4<String, String, Long, Long> input4 =
                new Tuple4<String, String, Long, Long>("key2", "val4", 18L, t(4));
        Tuple4<String, String, Long, Long> input5 =
                new Tuple4<String, String, Long, Long>("key1", "val5", 11L, t(6));
        Tuple4<String, String, Long, Long> input6 =
                new Tuple4<String, String, Long, Long>("key1", "val6", -121L, t(7));
        Tuple4<String, String, Long, Long> input7 =
                new Tuple4<String, String, Long, Long>("key2", "val7", -111L, t(8));
        Tuple4<String, String, Long, Long> input8 =
                new Tuple4<String, String, Long, Long>("key2", "val8", 111L, t(9));

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream1 = env.addSource(new Tuple4Soruce(
                t(0), input1, input2, input3, input4,t(5),
                input5, input6, input7, input8,t(10)
        ));

        dataStream1.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        @SuppressWarnings("unchecked")
        DataStream<Tuple4<String, String, Long, Long>> dataStream2 = env.addSource(new Tuple4Soruce(
                t(0), input1, input3,input3,input4,input4,input4,t(5),
                 input5,input6, t(10),t(11)
        ));

        dataStream2.assignTimestampsAndWatermarks(new AscendingTimestampExtractor<Tuple4<String, String, Long, Long>>() {
            @Override
            public long extractAscendingTimestamp(Tuple4<String, String, Long, Long> tuple4) {
                return tuple4.f3;
            }
        });

        tsAssigner = TumblingEventTimeWindows.of(Time.minutes(5));

        dataStream1.join(dataStream2)
                .where(new Tuple4KeySelector())
                .equalTo(new Tuple4KeySelector())
                .window(tsAssigner)
                .trigger(EventTimeTrigger.create())
                .evictor(CountEvictor.of(2))
                .apply(new Tuple4JoinFunction())
                .addSink(new CollectTuple2Sink());
        env.execute();
        System.out.println(CollectTuple2Sink.VALUES);

    }

    private static class CollectTuple2Sink
            implements SinkFunction<Tuple2<String, Long>> {

        public static final List<Tuple2<String, Long>> VALUES = new ArrayList<>();

        @Override
        public synchronized void invoke(Tuple2<String, Long> value)
                throws Exception {
            VALUES.add(value);
        }

    }
//join function ---> Takes the 2nd and 4th field of a tuple and convert tuple4 to tuple2
    private static class Tuple4JoinFunction implements JoinFunction<Tuple4<String, String, Long, Long>, Tuple4<String, String, Long, Long>, Tuple2<String, Long>> {
        @Override
        public Tuple2<String, Long> join(Tuple4<String, String, Long, Long> tuple4, Tuple4<String, String, Long, Long> tuple42) throws Exception {
            return new Tuple2<>(tuple4.f1, tuple4.f3);
        }
    }
//key selector --> select the 2nd value of tuple 4
    private static class Tuple4KeySelector implements KeySelector<Tuple4<String, String, Long, Long>, String> {
        @Override
        public String getKey(Tuple4<String, String, Long, Long> tuple4) throws Exception {
            return tuple4.f1;
        }
    }

//source function --> generates a sequence input for tuple4
    private static class Tuple4Soruce
            implements SourceFunction, ResultTypeQueryable<Tuple4<String, String, Long, Long>> {
        private volatile boolean running = true;
        private Object[] testStream;
        private TypeInformation<Tuple4<String, String, Long, Long>> typeInformation =
                TypeInformation.of(new TypeHint<Tuple4<String, String, Long, Long>>() {
                });


        Tuple4Soruce(Object... eventsOrWatermarks) {
            this.testStream = eventsOrWatermarks;
        }

        @Override
        public void run(SourceContext ctx) throws Exception {
            for (int i = 0; (i < testStream.length) && running; i++) {
                if (testStream[i] instanceof Tuple4) {
                    Tuple4<String, String, Long, Long> tuple =
                            (Tuple4<String, String, Long, Long>) testStream[i];
                    ctx.collectWithTimestamp(tuple, tuple.f3);
                } else if (testStream[i] instanceof Long) {
                    Long ts = (Long) testStream[i];
                    ctx.emitWatermark(new Watermark(ts));
                } else {
                    throw new RuntimeException(testStream[i].toString());
                }
            }
        }

        @Override
        public void cancel() {
            running = false;
        }

        @Override
        public TypeInformation<Tuple4<String, String, Long, Long>> getProducedType() {
            return typeInformation;
        }

    }
//util function to generate time
    public long t(int n) {
        return new DateTime(2018, 1, 1, 0, 0).plusMinutes(n).getMillis();
    }

运行1的日志:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#1914335182] with leader session id 2a8bf59e-01fa-4e67-892c-83b10dd65be1.
01/09/2020 00:50:16 Job execution switched to status RUNNING.
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:50:16 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:50:16 Job execution switched to status FINISHED.
[(val1,1514745060000), (val5,1514745360000), (val6,1514745420000)]

Run2的日志:

Connected to JobManager at Actor[akka://flink/user/jobmanager_1#-1448653751] with leader session id 291df2cb-96fd-4e3c-b46c-911d2ca11905.
01/09/2020 00:49:42 Job execution switched to status RUNNING.
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to SCHEDULED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to DEPLOYING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to RUNNING 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to RUNNING 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 Source: Custom Source -> (Timestamps/Watermarks, Map)(1/1) switched to FINISHED 
01/09/2020 00:49:42 TriggerWindow(TumblingEventTimeWindows(300000), ListStateDescriptor{serializer=org.apache.flink.api.common.typeutils.base.ListSerializer@581cedc0}, EventTimeTrigger(), org.apache.flink.streaming.api.windowing.evictors.CountEvictor@43d7741f, WindowedStream.apply(CoGroupedStreams.java:303)) -> Sink: Unnamed(1/1) switched to FINISHED 
01/09/2020 00:49:42 Job execution switched to status FINISHED.
[(val1,1514745060000), (val3,1514745180000), (val4,1514745240000), (val5,1514745360000), (val6,1514745420000)]

源函数和其他定义均来自this教程。还探讨了Flink官方文档中有无驱逐者时执行基本工作的多种方法。在没有驱逐者的情况下测试了多件事情,所有运行的输出均符合预期。一旦驱逐者出现,情况开始变得不确定。

Flink版本1.4.2

java apache-flink flink-streaming
1个回答
1
投票

您尚未共享所有代码,但是据我所知,正在发生的事情是结果取决于摄取顺序-例如,基于计数的窗口就是这种情况-在这种情况下,您将无法获得确定的结果。

窗口连接正在从两个输入流读取,并且将在每个流中按顺序处理事件,但两个流将以不确定性和不可控制的方式相互竞争。当且仅当窗口触发和处理仅基于事件时间时,结果才是确定的。如果涉及计数或处理时间,那么您就不能期望产生确定性的结果。

© www.soinside.com 2019 - 2024. All rights reserved.