Flink GlobalWindow Trigger 只处理触发事件

问题描述 投票:0回答:1

我有一个事件属性的数据流keyby,然后传递到一个全局窗口,当特定事件到来时触发,问题是当窗口被触发处理事件时,它只处理触发事件。这是代码:

public class MyEvent {
    public String key;
    public String value;

    public MyEvent() {
        // Default constructor
    }

    public MyEvent(String key, String value) {
        this.key = key;
        this.value = value;
    }

    public String getKey() {
        return key;
    }

    public void setKey(String key) {
        this.key = key;
    }

    public String getValue() {
        return value;
    }

    public void setValue(String value) {
        this.value = value;
    }

    @Override
    public String toString() {
        return "Key: " + key + ", Value: " + value;
    }
}




public class KeyedGlobalWindowTriggerExample {

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

        DataStream<MyEvent> input = env.socketTextStream("localhost", 9091)
                .map(new MapFunction<String, MyEvent>() {
                    @Override
                    public MyEvent map(String value) {
                        // Assuming the input stream is in the format "key,value"
                        String[] parts = value.split(",");
                        return new MyEvent(parts[0], parts[1]);
                    }
                });

        // Key By Event Property
        KeyedStream<MyEvent, String> keyedStream = input
                .keyBy(event -> event.key);

        //Create a Custom Trigger
        keyedStream.window(GlobalWindows.create())
                .trigger(new Trigger<MyEvent, GlobalWindow>() {
                    @Override
                    public TriggerResult onElement(MyEvent event, long timestamp, GlobalWindow window, TriggerContext ctx) {
                        if ("eod".equals(event.getKey())) {
                            return TriggerResult.FIRE;
                        }
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public TriggerResult onProcessingTime(long time, GlobalWindow window, TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public TriggerResult onEventTime(long time, GlobalWindow window, TriggerContext ctx) {
                        return TriggerResult.CONTINUE;
                    }

                    @Override
                    public void clear(GlobalWindow window, TriggerContext ctx) {
                        // Handle clearing of window state if necessary
                    }
                })
                .process(new MyProcessWindowFunction())
                .print();

        env.execute("Keyed Global Window Trigger Example");
    }
}

public class MyProcessWindowFunction extends ProcessWindowFunction<MyEvent, String, String, GlobalWindow> {
    @Override
    public void process(String key, Context context, Iterable<MyEvent> elements, Collector<String> out) {
        for (MyEvent element : elements) {
            out.collect(element.toString());
        }
    }
}

如何确保它处理全局窗口中的所有事件,并带有触发事件?

如果我发送以下事件: 测试一,一 事件_二,二 事件_三,三 事件四,四 爆炸物,爆炸物

输出只有: 键:eod,事件:[键:eod,值:eod,]

我希望看到发送到此窗口的所有事件。

java apache-flink data-stream stream-processing
1个回答
0
投票

this,keyBy函数将你的事件分割到不同的窗口,这就是为什么只输出eod。 修改你的代码。

  1. 删除
    KeyedStream<MyEvent, String> keyedStream = input.keyBy(event -> event.key);
  2. keyedStream.window()
    更改为
    input.windowAll()
  3. 改变
    public class MyProcessWindowFunction extends ProcessWindowFunction<MyEvent, String, String, GlobalWindow> { @Override public void process(String key, Context context, Iterable<MyEvent> elements, Collector<String> out) { for (MyEvent element : elements) { out.collect(element.toString()); } } }
    public static class MyProcessWindowFunction extends ProcessAllWindowFunction<MyEvent, String, GlobalWindow> { @Override public void process(ProcessAllWindowFunction<MyEvent, String, GlobalWindow>.Context context, Iterable<MyEvent> elements, Collector<String> out) throws Exception { for (MyEvent element : elements) { out.collect(element.toString()); } } }
© www.soinside.com 2019 - 2024. All rights reserved.