在 Apache Flink 中,如何拥有一个汇点?

问题描述 投票:0回答:1

在 flink 1.14 中,我怎样才能拥有一个同时写入 kafka 和其他一些数据源的接收器。

我尝试创建一个自定义水槽来扩展

RichSinkFunction
。在
open
方法中,我还初始化了一个
KafkaSink

但是

KafkaSink
没有相当于
invoke
的方法,我可以在我自己的自定义接收器的
invoke
方法中调用该方法。

文档

KafkaSink
的用法是 -
stream.sinkTo(kafakSink)

这种方法对我来说不起作用,因为在水槽中,我决定是否写信给kafka。

apache-flink
1个回答
0
投票

我认为这里有两个选择是有意义的,一个是非常微不足道的(如果适用的话),而另一个则不是那么重要:

预过滤 Kafka 数据 - 由于您只会有条件地将数据写入 Kafka 接收器,因此您可以考虑使用

filter()
操作来确定 if 记录是否应该写入 Kafka:

// Sink records to your "other sink"
stream.sinkTo(someOtherSink)

// Conditionally sink records to Kafka
stream
  .filter(YourKafkaFilterFunction())
  .sinkTo(kafkaSink)

编写自定义接收器 - 如果这不是一个选项,您可能需要编写一个自定义函数来模仿 KafkaSink 的行为,该函数在幕后使用 KafkaWriter/KafkaCommitter,并且不一定那么简单和其他水槽一样(这是一个

TwoPhaseCommittingStatefulSink
)。

我强烈建议查看相应存储库中

KafkaSink
本身的源代码,看看该接收器的实现如何扩展它。

© www.soinside.com 2019 - 2024. All rights reserved.