我一直在努力解决一些与 kafka 流一起工作以及我的消费者如何期望数据的问题。我的管道工作如下:
{"xyz": "Test from couchbase"}
我的消息曾经成功添加到主题中。然后我需要发送到数据库。
void startStreaming() {
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafkastreamtopic");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR_CLASS_CONFIG, "org.apache.kafka.streams.processor.FailOnInvalidTimestamp");
StreamsBuilder builder = new StreamsBuilder();
builder.<String, String>stream("topicstream")
.mapValues(value -> {
try {
JSONObject jsonObject = new JSONObject(value);
// Extract the "xyz" field
String xyz = jsonObject.optString("xyz", null);
// Create a new JSON object with the correct format
JSONObject transformedObject = new JSONObject();
transformedObject.put("xyz", xyz);
return transformedObject.toString();
} catch (Exception e) {
// Log and handle the error appropriately
logger.error("Error transforming value: {}", value, e);
return value; // Return the original value in case of error
}
})
.to("jdbcsinktopic2", Produced.valueSerde(Serdes.String()));
KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.setStateListener((newState, oldState) -> {
logger.info("State changed from " + oldState + " to " + newState);
});
kafkaStreams.start();
Runtime.getRuntime().addShutdownHook(new Thread(kafkaStreams::close));
上面的代码成功地将消息写入
jdbcsinktopic2
,我可以看到这样的:
{"xyz":"Test from couchbase"}
kafka-json-schema-console-producer --bootstrap-server 192.168.60.62:9092 --property schema.registry.url=http://192.168.60.62:8081 --topic jdbcsinktopic2 --property value.schema='{"type":"object", "properties":{"xyz":{"type":"string"}}}'
当我通过在创建架构时添加消息来测试它时,数据已毫无问题地插入到我的数据库中。
{
"name": "mysql-sink-2",
"config": {
"connector.class": "io.debezium.connector.jdbc.JdbcSinkConnector",
"tasks.max": "1",
"topics": "jdbcsinktopic2",
"connection.url": "jdbc:mysql://mysql:3306/inventory",
"connection.username": "debeziumexample",
"connection.password": "example",
"auto.create": "true",
"insert.mode": "insert",
"pk.mode": "none",
"value.converter.schemas.enable": "true",
"value.converter": "io.confluent.connect.json.JsonSchemaConverter",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"key.converter.schemas.enable": "true",
"errors.tolerance": "all",
"errors.log.enable": true,
"errors.log.include.messages": true
}
}
当我转换数据以及消费者的期望时,问题就出现了。我对这个话题很陌生。我如何管理这些数据,以通过我的 jdbcsink 将我的 kafka 流应用程序转换后的数据发送到我的数据库?
任何建议、纠正或帮助都会很棒。
问候, 里戈
您使用 String Serde 生成了一条消息,而不是 Confluence 中的 Jsonschema serde。因此,记录中不存在“魔法字节”。
当您使用正确的 Serde 进行生成时,它会自动在注册表中注册架构
此外,
"schemas.enable": "true"
仅适用于 Kafka 内置的 JsonConverter
类,不适用于 Confluence 中的 JsonschemaConverter
,因为它总是强制执行一种模式