总结:我需要将记录发送到kafka主题,并且使用microsoft sql server接收器连接器必须从主题获取记录并将其发送到数据库。 ** 我尝试将记录发送到主题的方法。**
1)手动方法(产生消息按钮)
2)Rest api(邮递员)
3)Java生产者应用程序(使用Ide)
4)Boomi流程
**
关注:**
每当我手动生成记录时(在合流云中生成消息按钮),它会发送到主题,并且 mssql 接收器连接器将处理记录并发送到数据库 -→ 这种情况成功
但是如果我在 Boomi、Postman、Java 应用程序等第 3 方的帮助下发送 ==> 记录发送到主题,但 mssql 接收器连接器会将记录推送到 DLQ,它不会发送到 Db。 这是我在 DLQ 消息中注意到的错误。 这是我为该主题添加的架构
[ { "key": "__connect.errors.topic", "value": "sample_data_test" }, { "key": "__connect.errors.partition", "value": "5" }, { "key": "__connect.errors.offset", "value": "12" }, { "key": "__connect.errors.connector.name", "value": "lcc-vk28vj" }, { "key": "__connect.errors.task.id", "value": "0" }, { "key": "__connect.errors.stage", "value": "VALUE_CONVERTER" }, { "key": "__connect.errors.class.name", "value": "io.confluent.connect.json.JsonSchemaConverter" }, { "key": "__connect.errors.exception.class.name", "value": "org.apache.kafka.connect.errors.DataException" }, { "key": "__connect.errors.exception.message", "value": "Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test: " }, { "key": "__connect.errors.exception.stacktrace", "value": "org.apache.kafka.connect.errors.DataException: **Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test:** \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:302)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)\n\t... 20 more\n" } ]
**1)mssql 接收器连接器(通过第 3 方发送记录时)有任何限制吗?
2)还有其他方法吗?**
(每当我使用kafka连接器、restapi或java生产者应用程序通过boomi进程将记录发送到kafka主题时)Mssql接收器连接器应该处理记录并发送到数据库。 注意:通过使用上述方法将数据发送到主题,但 mssql sink 无法处理这些记录。
反序列化 id -1 的 JSON 消息时出错
此消息通常意味着您没有正确序列化数据以包含任何架构。这是生产者问题而不是接收器问题。
JDBC接收器连接器总是需要一些数据模式。发送纯文本 JSON 将没有模式。
我不使用Confluence Cloud UI,但我的理解是你无法控制其中的序列化;它只是发送文本,在使用它来生成时从不引用任何模式。