在 flink 1.14 中,我怎样才能拥有一个同时写入 kafka 和其他一些数据源的接收器。
我尝试创建一个自定义水槽来扩展
RichSinkFunction
。在 open
方法中,我还初始化了一个 KafkaSink
。
但是
KafkaSink
没有相当于 invoke
的方法,我可以在我自己的自定义接收器的 invoke
方法中调用该方法。
文档说
KafkaSink
的用法是 - stream.sinkTo(kafakSink)
这种方法对我来说不起作用,因为在水槽中,我决定是否写信给kafka。
我认为这里有两个选择是有意义的,一个是非常微不足道的(如果适用的话),而另一个则不是那么重要:
预过滤 Kafka 数据 - 由于您只会有条件地将数据写入 Kafka 接收器,因此您可以考虑使用
filter()
操作来确定 if 记录是否应该写入 Kafka:
// Sink records to your "other sink"
stream.sinkTo(someOtherSink)
// Conditionally sink records to Kafka
stream
.filter(YourKafkaFilterFunction())
.sinkTo(kafkaSink)
编写自定义接收器 - 如果这不是一个选项,您可能需要编写一个自定义函数来模仿 KafkaSink 的行为,该函数在幕后使用 KafkaWriter/KafkaCommitter,并且不一定那么简单和其他水槽一样(这是一个
TwoPhaseCommittingStatefulSink
)。
我强烈建议查看相应存储库中
KafkaSink
本身的源代码,看看该接收器的实现如何扩展它。