嘿,我在这里遇到了同样的问题,使用 avro 和模式注册表让 flink 从 kafka 读取数据。我可以使用 flink 尝试从服务器读取数据来查看 schema-registry 上的日志,但我在 Flink UI 提交作业选项卡上也遇到了相同的错误:
服务器响应消息:
org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
... 2 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
at com.example.DataStreamJob.main(DataStreamJob.java:50)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
at java.base/java.lang.reflect.Method.invoke(Unknown Source)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
... 2 more
这是 JobManager 日志:
2025-01-05 21:13:46,934 INFO org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Registering task executor 10.244.0.5:6122-71d77a under 23ae9ba7acbe7a3d9890b9c258c3098c at the slot manager.
2025-01-05 21:20:21,085 WARN org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2025-01-05 21:20:21,484 INFO org.apache.flink.client.ClientUtils [] - Starting program (detached: true)
2025-01-05 21:20:21,798 WARN org.apache.flink.configuration.Configuration [] - Config uses deprecated configuration key 'state.checkpoints.dir' instead of proper key 'execution.checkpointing.dir'
2025-01-05 21:20:21,810 WARN org.apache.flink.configuration.Configuration [] - Config uses deprecated configuration key 'state.backend' instead of proper key 'state.backend.type'
2025-01-05 21:20:23,542 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler [] - Exception occurred in REST handler: Could not execute application.
模式注册表的日志片段:
[2025-01-06 02:18:40,157] INFO 192.168.31.82 - - [05/Jan/2025:20:48:40 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 95 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:30,953] INFO 192.168.31.82 - - [05/Jan/2025:20:53:30 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 175 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:35,526] INFO 192.168.31.82 - - [05/Jan/2025:20:53:35 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 19 (io.confluent.rest-utils.requests:62)
我能做什么这是我的pom.xml(我使用的是flink 1.20.0版本):
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.example</groupId>
<artifactId>flink-kafka-avro</artifactId>
<version>1.0-SNAPSHOT</version>
<packaging>jar</packaging>
<name>Flink Quickstart Job</name>
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<flink.version>1.20.0</flink.version>
<target.java.version>1.8</target.java.version>
<scala.binary.version>2.12</scala.binary.version>
<maven.compiler.source>${target.java.version}</maven.compiler.source>
<maven.compiler.target>${target.java.version}</maven.compiler.target>
<log4j.version>2.17.1</log4j.version>
<kafka.version>3.9.0</kafka.version>
<confluent.version>7.8.0</confluent.version>
</properties>
<repositories>
<repository>
<id>apache.snapshots</id>
<name>Apache Development Snapshot Repository</name>
<url>https://repository.apache.org/content/repositories/snapshots/</url>
<releases>
<enabled>false</enabled>
</releases>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>
<dependencies>
<!-- Apache Flink dependencies -->
<!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- Add connector dependencies here. They must be in the default scope (compile). -->
<!-- Example:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.0.0-1.17</version>
</dependency>
-->
<!-- Add logging framework, to produce console output when running in the IDE. -->
<!-- These dependencies are excluded from the application JAR by default. -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>3.3.0-1.20</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-slf4j-impl</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>${log4j.version}</version>
<scope>runtime</scope>
</dependency>
<!-- flink dependencies -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-runtime</artifactId>
<version>${flink.version}</version>
<scope>compile</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-base</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>${kafka.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
<!-- <dependency>-->
<!-- <groupId>io.confluent</groupId>-->
<!-- <artifactId>kafka-schema-serializer</artifactId>-->
<!-- <version>7.8.0</version>-->
<!-- </dependency>-->
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry-client</artifactId>
<version>7.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
<dependencyManagement>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-core</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
</dependency>
</dependencies>
</dependencyManagement>
<build>
<plugins>
<!-- Java Compiler -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.8.1</version>
<configuration>
<source>${target.java.version}</source>
<target>${target.java.version}</target>
</configuration>
</plugin>
<!-- Maven Shade Plugin to create a fat jar -->
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>3.2.4</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
<pluginManagement>
<plugins>
<!-- Lifecycle Mapping Plugin for Eclipse -->
<plugin>
<groupId>org.eclipse.m2e</groupId>
<artifactId>lifecycle-mapping</artifactId>
<version>1.0.0</version>
<configuration>
<lifecycleMappingMetadata>
<pluginExecutions>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<versionRange>[3.1.1,)</versionRange>
<goals>
<goal>shade</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
<pluginExecution>
<pluginExecutionFilter>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<versionRange>[3.1,)</versionRange>
<goals>
<goal>testCompile</goal>
<goal>compile</goal>
</goals>
</pluginExecutionFilter>
<action>
<ignore/>
</action>
</pluginExecution>
</pluginExecutions>
</lifecycleMappingMetadata>
</configuration>
</plugin>
</plugins>
</pluginManagement>
</build>
</project>
我的 DataStreamJob.java:
package com.example;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;
//import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;
public class DataStreamJob {
public static void main(String[] args) throws Exception {
// Set up the execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
// Load properties
Properties properties = new Properties();
properties.load(Files.newInputStream(Paths.get("/opt/flink/consumer.properties")));
String bootstrapServers = properties.getProperty("bootstrap.servers");
String schemaRegistryUrl = properties.getProperty("schema.registry.url");
String topicName = properties.getProperty("topic.name");
String groupId = properties.getProperty("group.id");
// Initialize Schema Registry Client
CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 10);
Schema avroSchema = null;
try {
String subject = topicName + "-value"; // Adjust subject naming as per your setup
avroSchema = new Schema.Parser().parse(schemaRegistryClient.getLatestSchemaMetadata(subject).getSchema());
} catch (Exception e) {
throw new RuntimeException("Failed to fetch schema from Schema Registry. Ensure the subject name and registry URL are correct.", e);
}
// Set up the Kafka source
KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
.setBootstrapServers(bootstrapServers)
.setTopics(topicName)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.latest())
.setValueOnlyDeserializer(AvroDeserializationSchema.forGeneric(avroSchema))
.build();
// Add the source to the environment
DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
// Process the stream (example transformation)
stream.map(GenericRecord::toString).print();
// Execute the Flink job
env.execute("Flink Kafka Avro Consumer");
}
}
我正在使用 flink 版本 1.20.0,并且我已经设置了我的 pom.xml,其中包含所有更新的依赖项以及所有 kafka 架构注册表连接,但仍然收到此错误。可能的原因是什么。
请帮忙。
java.lang.NoSuchMethodError
通常是版本不匹配造成的。
flink-connector-kafka v3.3.0 是针对
构建的<kafka.version>3.4.0</kafka.version>
<confluent.version>7.4.4</confluent.version>
我怀疑问题就在那里(但我只是猜测)。