我正在编写一个 Kafka Streams 应用程序,该应用程序转换从非关系自定义 Kafka 源连接器接收的数据,并将其拆分为多个主题以对其进行规范化(以便随后可以由 JDBC 接收器连接器使用)。摄取的数据具有保存在自托管架构注册表中的架构,并使用
org.json.JSONObject
反序列化为 io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
。
处理流后,我有一个 JSONObject 想要写入 Kafka。我再次尝试使用
io.confluent.kafka.streams.serdes.json.KafkaJsonSchemaSerde
来使用架构进行序列化,但收到错误 WARN com.kjetland.jackson.jsonSchema.JsonSchemaGenerator - Not able to generate jsonSchema-info for type: [simple type, class org.json.JSONObject] - probably using custom serializer which does not override acceptJsonFormatVisitor
并创建了一个空架构:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "JSON Object"
}
这是我在处理输入流后再次写入Kafka的代码:
// process primitive types (remove all non-primitive types in each record)
KStream<String, JSONObject> primitiveStream = inputStream.mapValues(value -> {
JSONObject inputObject = value;
for (String key : inputObject.keySet()) {
if (inputObject.get(key) instanceof JSONObject) {
logger.info("Key: '{}' removed, is an object.", key);
inputObject.remove(key);
} else if (inputObject.get(key) instanceof JSONArray) {
logger.info("Key: '{}' removed, is an array.", key);
inputObject.remove(key);
}
}
return inputObject;
});
KafkaJsonSchemaSerde<JSONObject> jsonobjserde = new KafkaJsonSchemaSerde(JSONObject.class);
Map<String, Object> jsonserdeConf = new HashMap<>();
jsonserdeConf.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
jsonobjserde.configure(jsonserdeConf, false);
primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), jsonobjserde));
我在网上找不到任何可以帮助我找到解决方案的东西。我考虑过手动创建架构并传递它(就像我在自定义连接器中所做的那样),但我找不到任何方法来使用 Kafka Streams 来实现这一点。我还尝试手动将架构添加到架构注册表并关闭 Kafka Streams 应用程序中的架构自动生成,但这只会导致找不到架构的错误。
对于此事的任何帮助,我将非常感激!
您看到的错误表明序列化器无法生成
JSONObject
的架构。
创建自定义序列化器:由于
JSONObject
不直接支持 KafkaJsonSchemaSerde
,因此您可以创建一个处理 JSONObject 并为其生成架构的自定义序列化器3。
使用 Jackson 注释:如果您的
JSONObject
表示更结构化的数据类型,请考虑使用 Jackson 注释定义 Java 类来表示数据。这样,KafkaJsonSchemaSerde
就可以为类生成模式。
以下是如何为
JSONObject
创建自定义序列化器的示例:
import com.fasterxml.jackson.databind.ObjectMapper;
import io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer;
import org.apache.kafka.common.serialization.Serializer;
import org.json.JSONObject;
public class JSONObjectSerializer implements Serializer<JSONObject> {
private final KafkaJsonSchemaSerializer<JSONObject> jsonSchemaSerializer;
private final ObjectMapper objectMapper = new ObjectMapper();
public JSONObjectSerializer(String schemaRegistryUrl) {
jsonSchemaSerializer = new KafkaJsonSchemaSerializer<>(JSONObject.class);
Map<String, Object> config = new HashMap<>();
config.put(AbstractKafkaSchemaSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryUrl);
jsonSchemaSerializer.configure(config, false);
}
@Override
public byte[] serialize(String topic, JSONObject data) {
try {
return jsonSchemaSerializer.serialize(topic, objectMapper.convertValue(data, Map.class));
} catch (Exception e) {
throw new RuntimeException("Error serializing JSONObject", e);
}
}
}
在您的 Kafka Streams 应用程序中,您可以使用此自定义序列化程序:
JSONObjectSerializer serializer = new JSONObjectSerializer(schemaRegistryUrl);
primitiveStream.to(inputTopic + "." + inputTopic, Produced.with(Serdes.String(), new Serde<>(serializer)));
此方法应该可以帮助您使用模式序列化
JSONObject
。