我想通过 Kafka Connect 提取密钥并将其附加到值中。我阅读了 SMT 并测试了一些 SMT,但我做不到。
我发送此记录值:
{"name":"ali"}
按此键:
Person
我希望将此 JSON 存储到 openserach(或弹性搜索)
{"name":"ali","key":"Person"}
这是我的连接器,我知道这是错误的,但我不知道如何修复它:
{
"name": "aaa",
"config": {
"name": "aaa",
"connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
"tasks.max": "1",
"topics": "test_x",
"transforms": "InsertKeyToValue",
"transforms.InsertKeyToValue.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertKeyToValue.static.field": "MessageSource",
"transforms.InsertKeyToValue.static.value": "key",
// "transforms.InsertKeyToValue.field": "key",
// "transforms.InsertKeyToValue.key.field": "key",
"key.ignore": "false",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"connection.url": "https://localhost:9200",
"connection.username": "test",
"connection.password": "test",
"batch.size": "10000",
"linger.ms": "1000",
"errors.tolerance": "all",
"errors.log.include.messages": "true",
"errors.log.enable": "true"
}
}
没有可用的 SMT。您可以编写自己的SMT。
我建议,如果可能的话,在生产者端本身添加消息的值部分中的键。
自定义 SMT 参考:https://github.com/cjmatta/kafka-connect-insert-uuid