连接Flink数据流和广播流后,我们发现在ProcessFunction的processElement方法中,ctx对象只能进行只读操作,无法直接输出到不同的流。这种设计限制限制了我们根据配置将数据输出到不同流的能力。为什么processElement中的ctx对象只支持只读操作?如何优雅地实现根据配置输出到不同流的需求?
我希望在连接Flink数据流和广播流后,能够在ProcessFunction的processElement方法中将数据侧输出到不同的流。
Flink 使用 ProcessFunctions
函数支持来自
context.output()
的 Side Outputs,该函数接受
OutputTag<T>
,详细说明您要返回的对象的类型信息以及对象本身,如下所示:
// Example OutputTag
object Tags{
val widgets = OutputTag("widgets", TypeInformation.of(Widget::class.java))
}
// Usage
override fun processElement(...) {
...
// Output a side-output (that may differ from your current type)
context.output(Tags.widget, element.widget)
}
如果您静态定义
OutputTag
,您可以轻松地在输出它的操作员之间以及在作业图中共享它,以创建新的侧面输出流:
// Contents outputted via collector.collect()
val exampleStream = yourPreviouslyConnectedStream
.process(YourExampleProcessFunction())
// Contents outputted via context.output(Tags.widget, ...)
val widgets = exampleStream.getSideOutput(Tags.widgets)
单个
ProcessFunction
中可使用的侧面输出数量没有任何限制或限制,因此,如果您需要识别多个输出,只需添加额外的 OutputTag
引用即可实现。