我对 Kafka 和 MongoDB 都很陌生,但我有一个现有的 Kafka 主题,我想将其消费到 Mongo 数据库中。我试图避免构建自己的消费者或连接器,而是仅使用Sink Connector。这样,我可以避免使用单独的连接器/消费者组件所带来的额外维护、成本和复杂性,也可以避免首先构建它的工作。复杂的部分是我想在进行任何插入之前以两种方式处理消息:
忽略其值不满足条件的消息(例如,颜色 = 绿色)
对于满足此条件的人,仅将消息中的某些字段插入到集合中
我认为我可以使用后处理器来完成转换部分。但是,我还没有看到有条件地完全忽略某些消息的示例。我本以为与转换数据相比,这对于接收器连接器来说是相当微不足道的事情。我是不是错过了什么?
您可以执行以下操作:
transforms=filter
transforms.filter.type=io.debezium.transforms.Filter
transforms.filter.language=jsr223.groovy
transforms.filter.condition=value.colour == 'green'
post.processor.chain=com.mongodb.kafka.connect.sink.processor.AllowListValueProjector
value.projection.type=AllowList
value.projection.list=colour,size,etc.
这是一个如何利用第三方 Debezium 过滤器 smt 来过滤消息以及如何利用本机 MongoDB Kafka 接收器连接器后处理器来转换输出文档的示例。