背景: 我想使用 Strimzi 实现有效负载值的 Avro 序列化。尽管将所需的插件加载到 Strimzi KafkaConnect YAML,但在通过 API 部署连接器时,我遇到了未找到类的错误。有人设法解决这个问题吗?
相关连接器配置规格:
"value.converter":"io.confluent.connect.avro.AvroConverter",
"value.converter.schema.registry.url": "<my url>"
KafkaConnect 容器配置
build:
output:
type: docker
image: <my image> #
pushSecret: <my secret>
plugins:
- name: mongodb-connector
artifacts:
- type: jar
url: https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.5.1/mongo-kafka-connect-1.5.1-all.jar # https://repo1.maven.org/maven2/org/mongodb/kafka/mongo-kafka-connect/1.5.0/mongo-kafka-connect-1.5.0-all.jar
- type: jar
url: https://repo1.maven.org/maven2/org/apache/avro/avro/1.10.2/avro-1.10.2.jar
- type: jar
url: https://packages.confluent.io/maven/io/confluent/kafka-connect-avro-converter/7.7.1/kafka-connect-avro-converter-7.7.1.jar
- type: jar
url: https://packages.confluent.io/maven/io/confluent/kafka-schema-serializer/7.7.1/kafka-schema-serializer-7.7.1.jar
- type: jar
url: https://packages.confluent.io/maven/io/confluent/kafka-schema-registry-client/7.7.1/kafka-schema-registry-client-7.7.1.jar
部署连接器时KafkaConnect错误日志:
无法启动任务(org.apache.kafka.connect.runtime.Worker)[StartAndStopExecutor-connect-my-connect-cluster-connect-0.my-connect-cluster-connect.kafka-dev.svc:8083-6 ] java.lang.NoClassDefFoundError:io/confluence/kafka/serializers/AbstractKafkaAvroSerializer 在 java.base/java.lang.ClassLoader.defineClass1(本机方法) 在 java.base/java.lang.ClassLoader.defineClass(ClassLoader.java:1017) 在 java.base/java.security.SecureClassLoader.defineClass(SecureClassLoader.java:150) 在 java.base/java.net.URLClassLoader.defineClass(URLClassLoader.java:524) 在 java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:427) 在 java.base/java.net.URLClassLoader$1.run(URLClassLoader.java:421) 在 java.base/java.security.AccessController.doPrivileged(AccessController.java:712) 在 java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:420) 在org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:116) 在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525) 在 io.confluence.connect.avro.AvroConverter.configure(AvroConverter.java:80) 在org.apache.kafka.connect.runtime.isolation.Plugins.newConverter(Plugins.java:395) 在 org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:634) 在 org.apache.kafka.connect.runtime.Worker.startSourceTask(Worker.java:560) 在 org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:2001) 在 org.apache.kafka.connect.runtime.distributed.DistributedHerder.lambda$getTaskStartingCallable$38(DistributedHerder.java:2018) 在 java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 在 java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 在 java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 在 java.base/java.lang.Thread.run(Thread.java:840) 引起:java.lang.ClassNotFoundException:io.confluence.kafka.serializers.AbstractKafkaAvroSerializer 在 java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:445) 在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:592) 在org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:124) 在 java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525) ...还有20个
您需要 kafka-avro-serializer 而不是 kafka-schema-serializer
您还应该使用 Confluence hub cli 来下载 Avro 转换器,而不是使用 Mavencentral,因为您的方法不能保证传递依赖
https://mvnrepository.com/artifact/io.confluence/kafka-avro-serializer/7.7.1
理想情况下,您还可以预先创建容器映像。