无法使用 Kafka-Connect 和 Schema-registry 反序列化 protobuf 数据

问题描述 投票:0回答:1

我正在尝试使用 Kafka-Connect 和 Schema-registry 将 Kafka 主题(带有原始数据!)放入 Postgres 表中!

我的 Kafka-Connect 连接器文件:

{
   "name":"proto_sink",
         "config":{
            "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.password":"some_password",
            "topics":"some_topic",
            "value.converter.schema.registry.url":"http://localhost:8081",
            "key.converter.schemas.enable":"false",
            "auto.evolve":"true",
            "connection.user":"some_user",
            "value.converter.schemas.enable":"true",
            "name":"sink_proto",
            "auto.create":"true",
            "connection.url":"jdbc:postgresql://localhost:5432/some_db",
            "value.converter":"io.confluent.connect.protobuf.ProtobufConverter",
            "insert.mode":"insert",
            "key.converter":"org.apache.kafka.connect.storage.StringConverter"
  }
}

我首先使用此命令将原型架构发送到架构注册表。

curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json"   --data '{"schema": "syntax = \"proto3\";\npackage nilian;\n\nmessage Person {\n  string name = 1;\n  int32 age = 2;\n}"}'   http://localhost:8081/subjects/some_topic-value/versions

然后我尝试使用此命令将连接器配置发送到 Kafka-Connect:

curl -X POST -H "Content-Type: application/json" -d @connector_configs/json_conn.json http://localhost:8083/connectors

所以我得到这个错误:

Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic some_topic to Protobuf: 
        at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:154)
        at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
        at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
        ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:228)
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:292)
        at io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:200)
        at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:132)
        ... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
        at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
        at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:133)
        ... 20 more

这有什么问题吗?

protocol-buffers apache-kafka-connect confluent-schema-registry
1个回答
0
投票

我已经在我的一个项目中遇到过这个问题,通过简单的搜索。 我想如果不行就谷歌搜索一下,我们谈谈吧,我不想限制你的想法 用我们公司一位伟人的话说,搜索 kon Hajiiiii。

© www.soinside.com 2019 - 2024. All rights reserved.