我想测试我使用 docker-compose 在本地设置的 kafka/connect/schema 注册表配置。最初 - 我使用 s3 接收器插件设置了一个连接实例,该插件将传入的 json 消息以 avro 格式写入到 s3。我能够使用以下格式发送消息
{"schema": {...}, "payload": {...}}
并且看到 kafka-connect 能够解析并设法将数据以 avro 格式正确接收到 s3。 现在我想使用架构注册表以避免在每条消息中发送架构。我使用
io.confluent.connect.json.JsonSchemaConverter
作为我的键和值转换器。
当我尝试测试它时 - 我使用 kafka-console-producer
实用程序生成一条消息并收到以下错误:
[2024-12-12 15:00:02,140] ERROR WorkerSinkTask{id=my-s3-sink-0} Task threw an uncaught and unrecoverable exception. Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:230)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:156)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:536)
at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:513)
at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:349)
at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:250)
at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:219)
at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:204)
at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:259)
at org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:237)
at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:840)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error of topic my-topic:
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:144)
at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$4(WorkerSinkTask.java:536)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:180)
at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:214)
... 14 more
Caused by: org.apache.kafka.common.errors.SerializationException: Error deserializing JSON message for id -1
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:238)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaJsonSchemaDeserializer.java:315)
at io.confluent.connect.json.JsonSchemaConverter$Deserializer.deserialize(JsonSchemaConverter.java:193)
at io.confluent.connect.json.JsonSchemaConverter.toConnectData(JsonSchemaConverter.java:127)
... 17 more
Caused by: org.apache.kafka.common.errors.SerializationException: Unknown magic byte!
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.getByteBuffer(AbstractKafkaSchemaSerDe.java:645)
at io.confluent.kafka.serializers.json.AbstractKafkaJsonSchemaDeserializer.deserialize(AbstractKafkaJsonSchemaDeserializer.java:129)
... 20 more
这就是我制作消息的方式:
./bin/kafka-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=schemaregistry:8085 --property value.schema.id=1 --topic my-topic
> {...}
我假设生产者没有正确生成消息。这是真的吗?如果是这样 - 我如何使用
kafka-console-producer
生成一条将成功处理的消息?如果它与生产者无关 - 还有什么可能的罪魁祸首?
这是我的配置:
docker-compose.yaml
:
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.8.0
restart: always
environment:
ZOOKEEPER_SERVER_ID: 1
ZOOKEEPER_CLIENT_PORT: "2181"
ZOOKEEPER_TICK_TIME: "2000"
ZOOKEEPER_SERVERS: "zookeeper:22888:23888"
ports:
- "2181:2181"
kafka:
image: confluentinc/cp-kafka:7.8.0
ports:
- 9092:9092
depends_on:
- zookeeper
environment:
KAFKA_ZOOKEEPER_CONNECT: "zookeeper:2181"
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:29092
KAFKA_BROKER_ID: 1
KAFKA_BROKER_RACK: "r1"
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_DELETE_TOPIC_ENABLE: "true"
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
KAFKA_SCHEMA_REGISTRY_URL: "schemaregistry:8085"
schemaregistry:
image: confluentinc/cp-schema-registry:7.8.0
restart: always
depends_on:
- zookeeper
- kafka
environment:
SCHEMA_REGISTRY_KAFKASTORE_CONNECTION_URL: "zookeeper:2181"
SCHEMA_REGISTRY_HOST_NAME: schemaregistry
SCHEMA_REGISTRY_LISTENERS: "http://0.0.0.0:8085"
KAFKASTORE_BOOTSTRAP_SERVERS: "PLAINTEXT://kafka:9092"
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: "kafka:9092"
ports:
- 8085:8085
connect:
image: confluentinc/cp-kafka-connect:7.7.2
container_name: connect
hostname: connect
depends_on:
- kafka
- schemaregistry
ports:
- "8083:8083"
environment:
CONNECT_BOOTSTRAP_SERVERS: "kafka:9092"
CONNECT_GROUP_ID: connect-cluster
CONNECT_CONFIG_STORAGE_TOPIC: _connect-configs
CONNECT_OFFSET_STORAGE_TOPIC: _connect-offsets
CONNECT_STATUS_STORAGE_TOPIC: _connect-status
# CONNECT_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_KEY_CONVERTER: org.apache.kafka.connect.json.JsonConverter
# CONNECT_INTERNAL_VALUE_CONVERTER: org.apache.kafka.connect.json.JsonConverter
CONNECT_KEY_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: "http://schemaregistry:8085"
CONNECT_VALUE_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: "http://schemaregistry:8085"
CONNECT_INTERNAL_KEY_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
CONNECT_INTERNAL_VALUE_CONVERTER: io.confluent.connect.json.JsonSchemaConverter
CONNECT_PLUGIN_PATH: /usr/share/java,/usr/share/confluent-hub-components
CONNECT_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_REST_ADVERTISED_HOST_NAME: "localhost"
volumes:
- ./connect/s3/connectors/confluentinc-kafka-connect-s3-10.5.19:/usr/share/confluent-hub-components
s3-connect.properties
:
{
"name": "my-s3-sink",
"config": {
"connector.class": "io.confluent.connect.s3.S3SinkConnector",
"tasks.max": "1",
"topics": "my-topic",
"topics.dir": "my-dir",
"behavior.on.null.values": "ignore",
"s3.region": "my-region",
"s3.bucket.name": "my-bucket",
"s3.part.size": "5242880",
"aws.access.key.id": "bla-bla-bla",
"aws.secret.access.key": "bla-bla-bla",
"storage.class": "io.confluent.connect.s3.storage.S3Storage",
"flush.size": "5",
"format.class": "io.confluent.connect.s3.format.avro.AvroFormat",
"schema.compatibility": "NONE",
"partitioner.class": "io.confluent.connect.storage.partitioner.FieldPartitioner",
"partition.field.name": "partitionFieldName"
}
}
从错误消息
org.apache.kafka.common.errors.SerializationException: Unknown magic byte
导致序列化消息的前 4 个字节缺少 schemaId(这种方式适用于 Confluence Schema(反)序列化器)。您必须提供 io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
作为生产者参数,以便架构 id 将被注入到您的消息中。
./bin/kafka-console-producer --bootstrap-server localhost:9092 --property schema.registry.url=schemaregistry:8085 --property value.schema.id=1 --topic my-topic --property key.serializer=io.confluent.kafka.serializers.json.KafkaJsonSchemaSerializer
> {...}
您还需要在 kafka libs
目录中添加
kafka-json-schema-serializerjar。