我有事件的输入数据流,
在处理它们时我想引用一些事件再次重新处理 几分钟后。
有办法实现吗?
这是我试图实现的一个简化示例:
var delayedMessagesOutputTag = new OutputTag<Long>("delayedMessagesOutputTag") {
};
var inputStream = env.fromElements(1L, 2L, 3L, 4L);
var resolvedElements = processEvents(delayedMessagesOutputTag, inputStream);
var delayedStream = resolvedElements
.getSideOutput(delayedMessagesOutputTag);
.process( /*** SOME MAGIC HERE TO CAUSE DELAY ***/);
var resolvedDelayedEvents = processEvents(delayedMessagesOutputTag, delayedStream);
resolvedElements
.union(resolvedDelayedEvents)
.addSink(new PrintSinkFunction<>());
env.execute();
....
private SingleOutputStreamOperator<Long> processEvents(OutputTag<Long> delayedMessagesOutputTag, DataStream<Long> inputStream) {
// Returns resolved events, write desired delayed events to delayedMessagesOutputTag
}
任何建议将不胜感激
您可以使用Flink Timers:
...
final var delay = Duration.ofMinutes(1);
final var delayedStream = resolvedElements.getSideOutput(delayedMessagesOutputTag)
.keyBy(new DelayedProcessFunction.KeySelectorImpl())
.process(new DelayedProcessFunction(delay));
...
DelayedProcess
和KeySelectorImpl
实例实现:
public class DelayedProcessFunction extends KeyedProcessFunction<String, Long, Long> {
private static final long serialVersionUID = 1L;
private final Long delayMs;
// we need to store delayed events
private transient ValueState<Long> state;
public DelayedProcessFunction(Duration delay) {
delayMs = delay.toMillis();
}
@Override
public void open(Configuration parameters) {
// setup events state
state = getRuntimeContext().getState(new ValueStateDescriptor<>("state", Long.class));
}
@Override
public void processElement(Long value, KeyedProcessFunction<String, Long, Long>.Context ctx, Collector<Long> out) throws Exception {
final var timerService = ctx.timerService();
// for example we use registerProcessingTimeTimer() instead of registerEventTimeTimer()
timerService.registerProcessingTimeTimer(timerService.currentProcessingTime() + delayMs);
// store current value for onTimer() in the future
state.update(value);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Long, Long>.OnTimerContext ctx, Collector<Long> out) throws Exception {
// pull out stored value for current random key (check getKey() realization below) and send to out dataStream
out.collect(state.value());
state.clear();
}
public static class KeySelectorImpl implements KeySelector<Long, String> {
private static final long serialVersionUID = 1L;
@Override
public String getKey(Long value) {
// random key for every event
return UUID.randomUUID().toString();
}
}
}
我在本地测试了这段代码,它有效。