提取密钥并将其附加到 Kafka Connect 中的消息值

问题描述 投票:0回答:1

我想通过 Kafka Connect 提取密钥并将其附加到值中。我阅读了 SMT 并测试了一些 SMT,但我做不到。

我发送此记录值:

{"name":"ali"}

按此键:

Person

我希望将此 JSON 存储到 openserach(或弹性搜索)

{"name":"ali","key":"Person"}

这是我的连接器,我知道这是错误的,但我不知道如何修复它:

{
    "name": "aaa",
    "config": {
        "name": "aaa",
        "connector.class": "io.aiven.kafka.connect.opensearch.OpensearchSinkConnector",
        "tasks.max": "1",
        "topics": "test_x",
        "transforms": "InsertKeyToValue",
        "transforms.InsertKeyToValue.type": "org.apache.kafka.connect.transforms.InsertField$Value",
        "transforms.InsertKeyToValue.static.field": "MessageSource",
        "transforms.InsertKeyToValue.static.value": "key",
        // "transforms.InsertKeyToValue.field": "key",
        // "transforms.InsertKeyToValue.key.field": "key",
        "key.ignore": "false",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "value.converter.schemas.enable": "false",
        "key.converter.schemas.enable": "false",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "connection.url": "https://localhost:9200",
        "connection.username": "test",
        "connection.password": "test",
        "batch.size": "10000",
        "linger.ms": "1000",
        "errors.tolerance": "all",
        "errors.log.include.messages": "true",
        "errors.log.enable": "true"
    }
}
apache-kafka apache-kafka-connect
1个回答
0
投票

没有可用的 SMT。您可以编写自己的SMT。

我建议,如果可能的话,在生产者端本身添加消息的值部分中的键。

自定义 SMT 参考:https://github.com/cjmatta/kafka-connect-insert-uuid

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.