如何使用 kafka msg key 作为 s3 连接器中的分区标准或 我怎样才能获得密钥并将其存储在 s3 对象中 谢谢!
您可以使用 Transform 将键“移动”到值,然后将两者放在同一个对象中,然后传递给分区器。
https://github.com/jcustenborder/kafka-connect-transform-archive
您可以使用 Lenses S3 Connector 设置 S3 中数据的自定义分区,包括有效负载字段或元数据(密钥、标头等)。
这是一个示例配置,它按键、有效负载和标头中的值进行分区:
connector.class=io.lenses.streamreactor.connect.aws.s3.sink.S3SinkConnector
connect.s3.kcql=INSERT INTO my-s3-bucket:somePrefix SELECT * FROM sales PARTITIONBY _key.year, _value.product_category, _headers.region STOREAS AVRO PROPERTIES('partition.include.keys' = true)
topics=sales
name= test_s3_sink
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter=org.apache.kafka.connect.storage.StringConverter
这里解释一下: https://lenses.io/blog/2023/09/open-source-lenses-kafka-s3-connector/