未知的魔法字节-Kafka Json

问题描述 投票:0回答:1

我一直在努力解决一些与 kafka 流一起工作以及我的消费者如何期望数据的问题。我的管道工作如下:

  1. 我配置了 couchbase 的源连接器,将下一条消息添加到主题:
{"xyz": "Test from couchbase"}

我的消息曾经成功添加到主题中。然后我需要发送到数据库。

  1. 我尝试使用kafka流来实现。
 void startStreaming() {
            Properties props = new Properties();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastreamtopic");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
            props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
            props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.FailOnInvalidTimestamp");

            StreamsBuilder builder = new StreamsBuilder();
            builder.<String, String>stream("topicstream")
                    .mapValues(value -> {
                        try {
                            JSONObject jsonObject = new JSONObject(value);
                            // Extract the "xyz" field
                            String xyz = jsonObject.optString("xyz", null);
                            // Create a new JSON object with the correct format
                            JSONObject transformedObject = new JSONObject();
                            transformedObject.put("xyz", xyz);
                            return transformedObject.toString();
                        } catch (Exception e) {
                            // Log and handle the error appropriately
                            logger.error("Error transforming value: {}", value, e);
                            return value; // Return the original value in case of error
                        }
                    })
                    .to("jdbcsinktopic2", Produced.valueSerde(Serdes.String()));

            KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
            kafkaStreams.setStateListener((newState, oldState) -> {
                logger.info("State changed from " + oldState + " to " + newState);
            });

            kafkaStreams.start();

            Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));

上面的代码成功地将消息写入

jdbcsinktopic2
,我可以看到这样的:

{"xyz":"Test from couchbase"}
  1. 我还向架构注册表和我的接收器配置添加了一个架构
kafka-json-schema-console-producer --bootstrap-server 192.168.60.62:9092 --property schema.registry.url=http://192.168.60.62:8081 --topic jdbcsinktopic2 --property value.schema='{"type":"object", "properties":{"xyz":{"type":"string"}}}'

当我通过在创建架构时添加消息来测试它时,数据已毫无问题地插入到我的数据库中。

{
    "name": "mysql-sink-2",
    "config": {
        "connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
        "tasks.max": "1",
        "topics": "jdbcsinktopic2",
        "connection.url": "jdbc:mysql://mysql:3306/inventory",
        "connection.username": "debeziumexample",
        "connection.password": "example",
        "auto.create": "true",
        "insert.mode": "insert",
        "pk.mode": "none",
        "value.converter.schemas.enable": "true",
        "value.converter": "io.confluent.connect.json.JsonSchemaConverter",
        "value.converter.schema.registry.url": "http://schema-registry:8081",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "key.converter.schemas.enable": "true",
        "errors.tolerance": "all",
        "errors.log.enable": true,
        "errors.log.include.messages": true
    }
}

当我转换数据以及消费者的期望时,问题就出现了。我对这个话题很陌生。我如何管理这些数据,以通过我的 jdbcsink 将我的 kafka 流应用程序转换后的数据发送到我的数据库?

任何建议、纠正或帮助都会很棒。

问候, 里戈

apache-kafka couchbase apache-kafka-streams
1个回答
0
投票

您使用 String Serde 生成了一条消息,而不是 Confluence 中的 Jsonschema serde。因此,记录中不存在“魔法字节”。

当您使用正确的 Serde 进行生成时,它会自动在注册表中注册架构

此外,

"schemas.enable": "true"
仅适用于 Kafka 内置的
JsonConverter
类,不适用于 Confluence 中的
JsonschemaConverter
,因为它总是强制执行一种模式

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.