Flink Streaming - 如何安排数据流在 X 分钟后重新处理?

问题描述 投票:0回答:1

我有事件的输入数据流,

在处理它们时我想引用一些事件再次重新处理 几分钟后。

有办法实现吗?

这是我试图实现的一个简化示例:

        var delayedMessagesOutputTag = new OutputTag<Long>("delayedMessagesOutputTag") {
        };

        var inputStream = env.fromElements(1L, 2L, 3L, 4L);
        var resolvedElements = processEvents(delayedMessagesOutputTag, inputStream);

        var delayedStream = resolvedElements
                .getSideOutput(delayedMessagesOutputTag);
                .process( /*** SOME MAGIC HERE TO CAUSE DELAY ***/);

        var resolvedDelayedEvents = processEvents(delayedMessagesOutputTag, delayedStream);

        resolvedElements
                .union(resolvedDelayedEvents)
                .addSink(new PrintSinkFunction<>());

        env.execute();

       ....
       private SingleOutputStreamOperator<Long> processEvents(OutputTag<Long> delayedMessagesOutputTag, DataStream<Long> inputStream) {
           // Returns resolved events, write desired delayed events to delayedMessagesOutputTag
    }

任何建议将不胜感激

flink-streaming
1个回答
0
投票

您可以使用Flink Timers

...
final var delay = Duration.ofMinutes(1);
final var delayedStream = resolvedElements.getSideOutput(delayedMessagesOutputTag)
                .keyBy(new DelayedProcessFunction.KeySelectorImpl())
                .process(new DelayedProcessFunction(delay));
...

DelayedProcess
KeySelectorImpl
实例实现:

public class DelayedProcessFunction extends KeyedProcessFunction<String, Long, Long> {
    private static final long serialVersionUID = 1L;

    private final Long delayMs;
    // we need to store delayed events
    private transient ValueState<Long> state;

    public DelayedProcessFunction(Duration delay) {
        delayMs = delay.toMillis();
    }

    @Override
    public void open(Configuration parameters) {
        // setup events state
        state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", Long.class));
    }

    @Override
    public void processElement(Long value, KeyedProcessFunction<String, Long, Long>.Context ctx, Collector<Long> out) throws Exception {
        final var timerService = ctx.timerService();
        // for example we use registerProcessingTimeTimer() instead of registerEventTimeTimer()
        timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + delayMs);
        // store current value for onTimer() in the future
        state.update(value);
    }

    @Override
    public void onTimer(long timestamp, KeyedProcessFunction<String, Long, Long>.OnTimerContext ctx, Collector<Long> out) throws Exception {
        // pull out stored value for current random key (check getKey() realization below) and send to out dataStream
        out.collect(state.value());
        state.clear();
    }

    public static class KeySelectorImpl implements KeySelector<Long, String> {
        private static final long serialVersionUID = 1L;

        @Override
        public String getKey(Long value) {
            // random key for every event
            return UUID.randomUUID().toString();
        }
    }
}

我在本地测试了这段代码,它有效。

© www.soinside.com 2019 - 2024. All rights reserved.