无法使用汇合kafka中的Microsoft sql server接收器连接器将记录从kafka主题发送到数据库

问题描述 投票:0回答:1

总结:我需要将记录发送到kafka主题,并且使用microsoft sql server接收器连接器必须从主题获取记录并将其发送到数据库。 ** 我尝试将记录发送到主题的方法。**

1)手动方法(产生消息按钮)

2)Rest api(邮递员)

3)Java生产者应用程序(使用Ide)

4)Boomi流程

**

关注:**

  • 每当我手动生成记录时(在合流云中生成消息按钮),它会发送到主题,并且 mssql 接收器连接器将处理记录并发送到数据库 -→ 这种情况成功

  • 但是如果我在 Boomi、Postman、Java 应用程序等第 3 方的帮助下发送 ==> 记录发送到主题,但 mssql 接收器连接器会将记录推送到 DLQ,它不会发送到 Db。 这是我在 DLQ 消息中注意到的错误。 这是我为该主题添加的架构

[   {     "key": "__connect.errors.topic",     "value": "sample_data_test"   },   {     "key": "__connect.errors.partition",     "value": "5"   },   {     "key": "__connect.errors.offset",     "value": "12"   },   {     "key": "__connect.errors.connector.name",     "value": "lcc-vk28vj"   },   {     "key": "__connect.errors.task.id",     "value": "0"   },   {     "key": "__connect.errors.stage",     "value": "VALUE_CONVERTER"   },   {     "key": "__connect.errors.class.name",     "value": "io.confluent.connect.json.JsonSchemaConverter"   },   {     "key": "__connect.errors.exception.class.name",     "value": "org.apache.kafka.connect.errors.DataException"   },   {     "key": "__connect.errors.exception.message",     "value": "Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test: "   },   {     "key": "__connect.errors.exception.stacktrace",     "value": "org.apache.kafka.connect.errors.DataException: **Converting byte[] to Kafka Connect data failed due to serialization error of topic sample_data_test:** \n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:217)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:254)\n\tat org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:189)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:546)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:521)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:347)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:216)\n\tat org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:247)\n\tat org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:302)\n\tat org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$7(Plugins.java:339)\n\tat java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:572)\n\tat java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)\n\tat java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)\n\tat java.base/java.lang.Thread.run(Thread.java:1583)\nCaused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)\n\tat io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)\n\tat io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)\n\t... 17 more\nCaused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!\n\tat io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:638)\n\tat io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)\n\t... 20 more\n"   } ]
  • 因此,在 Java 生产者应用程序中,我所做的就是在将消息发送到主题之前,我还进行了序列化。

**1)mssql 接收器连接器(通过第 3 方发送记录时)有任何限制吗?

2)还有其他方法吗?**

(每当我使用kafka连接器、restapi或java生产者应用程序通过boomi进程将记录发送到kafka主题时)Mssql接收器连接器应该处理记录并发送到数据库。 注意:通过使用上述方法将数据发送到主题,但 mssql sink 无法处理这些记录。

java json apache-kafka apache-kafka-connect boomi
1个回答
0
投票

反序列化 id -1 的 JSON 消息时出错

此消息通常意味着您没有正确序列化数据以包含任何架构。这是生产者问题而不是接收器问题。

JDBC接收器连接器总是需要一些数据模式。发送纯文本 JSON 将没有模式。

我不使用Confluence Cloud UI,但我的理解是你无法控制其中的序列化;它只是发送文本,在使用它来生成时从不引用任何模式。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.