Kafka Streams 从 JSONObject 创建空模式

问题描述 投票:0回答:1

我正在编写一个 Kafka Streams 应用程序,该应用程序转换从非关系自定义 Kafka 源连接器接收的数据,并将其拆分为多个主题以对其进行规范化(以便随后可以由 JDBC 接收器连接器使用)。摄取的数据具有保存在自托管架构注册表中的架构,并使用

org.json.JSONObject
反序列化为
io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde

处理流后,我有一个 JSONObject 想要写入 Kafka。我再次尝试使用

io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
来使用架构进行序列化,但收到错误
WARN com.kjetland.jackson.jsonSchema.JsonSchemaGenerator - Not able to generate jsonSchema-info for type: [simple type, class org.json.JSONObject] - probably using custom serializer which does not override acceptJsonFormatVisitor
并创建了一个空架构:

{
    "$schema": "http://json-schema.org/draft-07/schema#",
    "title": "JSON Object"
}

这是我在处理输入流后再次写入Kafka的代码:

// process primitive types (remove all non-primitive types in each record)
KStream<String, JSONObject> primitiveStream = inputStream.mapValues(value -> {
    JSONObject inputObject = value;
    for (String key : inputObject.keySet()) {
        if (inputObject.get(key) instanceof JSONObject) {
            logger.info("Key: '{}' removed, is an object.", key);
            inputObject.remove(key);
        } else if (inputObject.get(key) instanceof JSONArray) {
            logger.info("Key: '{}' removed, is an array.", key);
            inputObject.remove(key);
        }
    }
    return inputObject;
});

KafkaJsonSchemaSerde<JSONObject> jsonobjserde = new KafkaJsonSchemaSerde(JSONObject.class);
Map<String, Object> jsonserdeConf = new HashMap<>();
jsonserdeConf.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
jsonobjserde.configure(jsonserdeConf, false);

primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), jsonobjserde));

我在网上找不到任何可以帮助我找到解决方案的东西。我考虑过手动创建架构并传递它(就像我在自定义连接器中所做的那样),但我找不到任何方法来使用 Kafka Streams 来实现这一点。我还尝试手动将架构添加到架构注册表并关闭 Kafka Streams 应用程序中的架构自动生成,但这只会导致找不到架构的错误。

对于此事的任何帮助,我将非常感激!

apache-kafka apache-kafka-streams apache-kafka-connect confluent-schema-registry
1个回答
0
投票

您看到的错误表明序列化器无法生成

JSONObject
的架构。

创建自定义序列化器:由于

JSONObject
不直接支持
KafkaJsonSchemaSerde
,因此您可以创建一个处理 JSONObject 并为其生成架构的自定义序列化器3。

使用 Jackson 注释:如果您的

JSONObject
表示更结构化的数据类型,请考虑使用 Jackson 注释定义 Java 类来表示数据。这样,
KafkaJsonSchemaSerde
就可以为类生成模式。

以下是如何为

JSONObject
创建自定义序列化器的示例:

import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.json.JSONObject;

public class JSONObjectSerializer implements Serializer<JSONObject> {
    private final KafkaJsonSchemaSerializer<JSONObject> jsonSchemaSerializer;
    private final ObjectMapper objectMapper = new ObjectMapper();

    public JSONObjectSerializer(String schemaRegistryUrl) {
        jsonSchemaSerializer = new KafkaJsonSchemaSerializer<>(JSONObject.class);
        Map<String, Object> config = new HashMap<>();
        config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
        jsonSchemaSerializer.configure(config, false);
    }

    @Override
    public byte[] serialize(String topic, JSONObject data) {
        try {
            return jsonSchemaSerializer.serialize(topic, objectMapper.convertValue(data, Map.class));
        } catch (Exception e) {
            throw new RuntimeException("Error serializing JSONObject", e);
        }
    }
}

在您的 Kafka Streams 应用程序中,您可以使用此自定义序列化程序:

JSONObjectSerializer serializer = new JSONObjectSerializer(schemaRegistryUrl);
primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), new Serde<>(serializer)));

此方法应该可以帮助您使用模式序列化

JSONObject

© www.soinside.com 2019 - 2024. All rights reserved.