我正在尝试使用 Kafka-Connect 和 Schema-registry 将 Kafka 主题(带有原始数据!)放入 Postgres 表中!
我的 Kafka-Connect 连接器文件:
{ "name":"proto_sink", "config":{ "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector", "connection.password":"some_password", "topics":"some_topic", "value.converter.schema.registry.url":"http://localhost:8081", "key.converter.schemas.enable":"false", "auto.evolve":"true", "connection.user":"some_user", "value.converter.schemas.enable":"true", "name":"sink_proto", "auto.create":"true", "connection.url":"jdbc:postgresql://localhost:5432/some_db", "value.converter":"io.confluent.connect.protobuf.ProtobufConverter", "insert.mode":"insert", "key.converter":"org.apache.kafka.connect.storage.StringConverter" } }
我首先使用此命令将原型架构发送到架构注册表。
curl -X POST -H "Content-Type: application/vnd.schemaregistry.v1+json" --data '{"schema": "syntax = \"proto3\";\npackage nilian;\n\nmessage Person {\n string name = 1;\n int32 age = 2;\n}"}' http://localhost:8081/subjects/some_topic-value/versions
然后我尝试使用此命令将连接器配置发送到 Kafka-Connect:
curl -X POST -H "Content-Type: application/json" -d @connector_configs/json_conn.json http://localhost:8083/connectors
所以我得到这个错误:
Caused by: org.apache.kafka.connect.errors.DataException: Failed to deserialize data for topic some_topic to Protobuf:
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:154)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$5(WorkerSinkTask.java:528)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:190)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:224)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing Protobuf message for id -1
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:228)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaProtobufDeserializer.java:292)
at io.confluent.connect.protobuf.ProtobufConverter$Deserializer.deserialize(ProtobufConverter.java:200)
at io.confluent.connect.protobuf.ProtobufConverter.toConnectData(ProtobufConverter.java:132)
... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:600)
at io.confluent.kafka.serializers.protobuf.AbstractKafkaProtobufDeserializer.deserialize(AbstractKafkaProtobufDeserializer.java:133)
... 20 more
这有什么问题吗?
我已经在我的一个项目中遇到过这个问题,通过简单的搜索。 我想如果不行就谷歌搜索一下,我们谈谈吧,我不想限制你的想法 用我们公司一位伟人的话说,搜索 kon Hajiiiii。