通过 kafka-console- Producer 发送带有 schema 消息的 json 的正确方法

问题描述 投票:0回答:1

我想测试我使用 docker-compose 在本地设置的 kafka/connect/schema 注册表配置。最初 - 我使用 s3 接收器插件设置了一个连接实例,该插件将传入的 json 消息以 avro 格式写入到 s3。我能够使用以下格式发送消息

{"schema": {...}, "payload": {...}}

并且看到 kafka-connect 能够解析并设法将数据以 avro 格式正确接收到 s3。 现在我想使用架构注册表以避免在每条消息中发送架构。我使用

io.confluent.connect.json.JsonSchemaConverter
作为我的键和值转换器。 当我尝试测试它时 - 我使用
kafka-console-producer
实用程序生成一条消息并收到以下错误:

[2024-12-12 15:00:02,140] ERROR WorkerSinkTask{id=my-s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:536)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
    at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
    at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
    at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic my-topic: 
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
    at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
    at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
    ... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)
    at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)
    at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)
    ... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
    at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:645)
    at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)
    ... 20 more

这就是我制作消息的方式:

./bin/kafka-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=schemaregistry:8085 --property value.schema.id=1 --topic my-topic
> {...}

我假设生产者没有正确生成消息。这是真的吗?如果是这样 - 我如何使用

kafka-console-producer
生成一条将成功处理的消息?如果它与生产者无关 - 还有什么可能的罪魁祸首?

这是我的配置:

docker-compose.yaml

services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.8.0
    restart: always
    environment:
      ZOOKEEPER_SERVER_ID: 1
      ZOOKEEPER_CLIENT_PORT: "2181"
      ZOOKEEPER_TICK_TIME: "2000"
      ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
    ports:
      - "2181:2181"
      
  kafka:
    image: confluentinc/cp-kafka:7.8.0
    ports:
      - 9092:9092
    depends_on:
      - zookeeper
    environment:
      KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
      KAFKA_BROKER_ID: 1
      KAFKA_BROKER_RACK: "r1"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_DELETE_TOPIC_ENABLE: "true"
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8085"

  schemaregistry:
    image: confluentinc/cp-schema-registry:7.8.0
    restart: always
    depends_on:
      - zookeeper
      - kafka
    environment:
      SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
      SCHEMA_REGISTRY_HOST_NAME: schemaregistry
      SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
      KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"
      SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka:9092"
    ports:
      - 8085:8085

  connect:
    image: confluentinc/cp-kafka-connect:7.7.2
    container_name: connect
    hostname: connect
    depends_on:
      - kafka
      - schemaregistry
    ports:
      - "8083:8083"
    environment:
      CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
      CONNECT_GROUP_ID: connect-cluster
      CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
      CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
      CONNECT_STATUS_STORAGE_TOPIC: _connect-status
      # CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      # CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      # CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      # CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
      CONNECT_KEY_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
      CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schemaregistry:8085"
      CONNECT_VALUE_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
      CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schemaregistry:8085"
      CONNECT_INTERNAL_KEY_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
      CONNECT_INTERNAL_VALUE_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
      CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
      CONNECT_REPLICATION_FACTOR: 1
      CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
      CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
    volumes:
      - ./connect/s3/connectors/confluentinc-kafka-connect-s3-10.5.19:/usr/share/confluent-hub-components

s3-connect.properties

{
    "name": "my-s3-sink",
    "config": {
        "connector.class": "io.confluent.connect.s3.S3SinkConnector",
        "tasks.max": "1",
        "topics": "my-topic",
        "topics.dir": "my-dir",
        "behavior.on.null.values": "ignore",
        "s3.region": "my-region",
        "s3.bucket.name": "my-bucket",
        "s3.part.size": "5242880",
        "aws.access.key.id": "bla-bla-bla",
        "aws.secret.access.key": "bla-bla-bla",
        "storage.class": "io.confluent.connect.s3.storage.S3Storage",
        "flush.size": "5",
        "format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
        "schema.compatibility": "NONE",
        "partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
        "partition.field.name": "partitionFieldName"
    }
}
apache-kafka apache-kafka-connect s3-kafka-connector
1个回答
0
投票

从错误消息

org.apache.kafka.common.errors.SerializationException: Unknown magic byte
导致序列化消息的前 4 个字节缺少 schemaId(这种方式适用于 Confluence Schema(反)序列化器)。您必须提供
io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
作为生产者参数,以便架构 id 将被注入到您的消息中。

./bin/kafka-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=schemaregistry:8085 --property value.schema.id=1 --topic my-topic --property key.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
> {...}

您还需要在 kafka libs 目录中添加

kafka-json-schema-serializer
jar。

© www.soinside.com 2019 - 2024. All rights reserved.