'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'

问题描述 投票:0回答:1

嘿,我在这里遇到了同样的问题,使用 avro 和模式注册表让 flink 从 kafka 读取数据。我可以使用 flink 尝试从服务器读取数据来查看 schema-registry 上的日志,但我在 Flink UI 提交作业选项卡上也遇到了相同的错误:

服务器响应消息:

org.apache.flink.runtime.rest.handler.RestHandlerException: Could not execute application.
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$1(JarRunHandler.java:114)
    at java.base/java.util.concurrent.CompletableFuture.uniHandle(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture$UniHandle.tryFire(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.postComplete(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: java.util.concurrent.CompletionException: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
    at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(Unknown Source)
    at java.base/java.util.concurrent.CompletableFuture.completeThrowable(Unknown Source)
    ... 2 more
Caused by: java.lang.NoSuchMethodError: 'org.apache.flink.formats.avro.AvroDeserializationSchema org.apache.flink.formats.avro.AvroDeserializationSchema.forGeneric(org.apache.avro.Schema)'
    at com.example.DataStreamJob.main(DataStreamJob.java:50)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(Unknown Source)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(Unknown Source)
    at java.base/java.lang.reflect.Method.invoke(Unknown Source)
    at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:356)
    at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:223)
    at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:113)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.tryExecuteJobs(DetachedApplicationRunner.java:84)
    at org.apache.flink.client.deployment.application.DetachedApplicationRunner.run(DetachedApplicationRunner.java:70)
    at org.apache.flink.runtime.webmonitor.handlers.JarRunHandler.lambda$handleRequest$0(JarRunHandler.java:108)
    ... 2 more

这是 JobManager 日志:

2025-01-05 21:13:46,934 INFO  org.apache.flink.runtime.resourcemanager.slotmanager.FineGrainedSlotManager [] - Registering task executor 10.244.0.5:6122-71d77a under 23ae9ba7acbe7a3d9890b9c258c3098c at the slot manager.
2025-01-05 21:20:21,085 WARN  org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Configuring the job submission via query parameters is deprecated. Please migrate to submitting a JSON request instead.
2025-01-05 21:20:21,484 INFO  org.apache.flink.client.ClientUtils                          [] - Starting program (detached: true)
2025-01-05 21:20:21,798 WARN  org.apache.flink.configuration.Configuration                 [] - Config uses deprecated configuration key 'state.checkpoints.dir' instead of proper key 'execution.checkpointing.dir'
2025-01-05 21:20:21,810 WARN  org.apache.flink.configuration.Configuration                 [] - Config uses deprecated configuration key 'state.backend' instead of proper key 'state.backend.type'
2025-01-05 21:20:23,542 ERROR org.apache.flink.runtime.webmonitor.handlers.JarRunHandler   [] - Exception occurred in REST handler: Could not execute application.

模式注册表的日志片段:


[2025-01-06 02:18:40,157] INFO 192.168.31.82 - - [05/Jan/2025:20:48:40 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 95 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:30,953] INFO 192.168.31.82 - - [05/Jan/2025:20:53:30 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 175 (io.confluent.rest-utils.requests:62)
[2025-01-06 02:23:35,526] INFO 192.168.31.82 - - [05/Jan/2025:20:53:35 +0000] "GET /subjects/rajattest-value/versions/latest HTTP/1.1" 200 259 "-" "Java/11.0.24" 19 (io.confluent.rest-utils.requests:62)

我能做什么这是我的pom.xml(我使用的是flink 1.20.0版本):

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>com.example</groupId>
    <artifactId>flink-kafka-avro</artifactId>
    <version>1.0-SNAPSHOT</version>
    <packaging>jar</packaging>

    <name>Flink Quickstart Job</name>

    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <flink.version>1.20.0</flink.version>
        <target.java.version>1.8</target.java.version>
        <scala.binary.version>2.12</scala.binary.version>
        <maven.compiler.source>${target.java.version}</maven.compiler.source>
        <maven.compiler.target>${target.java.version}</maven.compiler.target>
        <log4j.version>2.17.1</log4j.version>
        <kafka.version>3.9.0</kafka.version>
        <confluent.version>7.8.0</confluent.version>

    </properties>

    <repositories>
        <repository>
            <id>apache.snapshots</id>
            <name>Apache Development Snapshot Repository</name>
            <url>https://repository.apache.org/content/repositories/snapshots/</url>
            <releases>
                <enabled>false</enabled>
            </releases>
            <snapshots>
                <enabled>true</enabled>
            </snapshots>
        </repository>
        <repository>
            <id>confluent</id>
            <url>https://packages.confluent.io/maven/</url>
        </repository>
    </repositories>

    <dependencies>
        <!-- Apache Flink dependencies -->
        <!-- These dependencies are provided, because they should not be packaged into the JAR file. -->
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-core</artifactId>
      <version>${flink.version}</version>
    </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients</artifactId>
            <version>${flink.version}</version>
        </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-streaming-java</artifactId>
        <version>${flink.version}</version>
    </dependency>

        <!-- Add connector dependencies here. They must be in the default scope (compile). -->

        <!-- Example:

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.0.0-1.17</version>
        </dependency>
        -->

        <!-- Add logging framework, to produce console output when running in the IDE. -->
        <!-- These dependencies are excluded from the application JAR by default. -->
    <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-connector-kafka</artifactId>
            <version>3.3.0-1.20</version>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-slf4j-impl</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-api</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>
        <dependency>
            <groupId>org.apache.logging.log4j</groupId>
            <artifactId>log4j-core</artifactId>
            <version>${log4j.version}</version>
            <scope>runtime</scope>
        </dependency>

        <!-- flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-runtime</artifactId>
            <version>${flink.version}</version>
            <scope>compile</scope>
        </dependency>
    <dependency>
      <groupId>org.apache.flink</groupId>
      <artifactId>flink-connector-base</artifactId>
      <version>${flink.version}</version>
      <scope>provided</scope>
    </dependency>
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>${flink.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>${kafka.version}</version>
        </dependency>
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>${confluent.version}</version>
        </dependency>
<!--        <dependency>-->
<!--            <groupId>io.confluent</groupId>-->
<!--            <artifactId>kafka-schema-serializer</artifactId>-->
<!--            <version>7.8.0</version>-->
<!--        </dependency>-->
        <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-schema-registry-client</artifactId>
            <version>7.8.0</version>
        </dependency>
    <dependency>
        <groupId>org.apache.flink</groupId>
        <artifactId>flink-avro</artifactId>
        <version>${flink.version}</version>
    </dependency>
    </dependencies>
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-core</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java</artifactId>
                <version>${flink.version}</version>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>

            <!-- Java Compiler -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.8.1</version>
                <configuration>
                    <source>${target.java.version}</source>
                    <target>${target.java.version}</target>
                </configuration>
            </plugin>

            <!-- Maven Shade Plugin to create a fat jar -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.2.4</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <createDependencyReducedPom>false</createDependencyReducedPom>
                        </configuration>
                    </execution>
                </executions>
            </plugin>

        </plugins>

        <pluginManagement>
            <plugins>

                <!-- Lifecycle Mapping Plugin for Eclipse -->
                <plugin>
                    <groupId>org.eclipse.m2e</groupId>
                    <artifactId>lifecycle-mapping</artifactId>
                    <version>1.0.0</version>
                    <configuration>
                        <lifecycleMappingMetadata>
                            <pluginExecutions>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-shade-plugin</artifactId>
                                        <versionRange>[3.1.1,)</versionRange>
                                        <goals>
                                            <goal>shade</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                                <pluginExecution>
                                    <pluginExecutionFilter>
                                        <groupId>org.apache.maven.plugins</groupId>
                                        <artifactId>maven-compiler-plugin</artifactId>
                                        <versionRange>[3.1,)</versionRange>
                                        <goals>
                                            <goal>testCompile</goal>
                                            <goal>compile</goal>
                                        </goals>
                                    </pluginExecutionFilter>
                                    <action>
                                        <ignore/>
                                    </action>
                                </pluginExecution>
                            </pluginExecutions>
                        </lifecycleMappingMetadata>
                    </configuration>
                </plugin>

            </plugins>
        </pluginManagement>
    </build>
</project>

我的 DataStreamJob.java:

package com.example;

import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.connector.kafka.source.KafkaSource;
import org.apache.flink.connector.kafka.source.enumerator.initializer.OffsetsInitializer;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.flink.formats.avro.AvroDeserializationSchema;
import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient;
import io.confluent.kafka.schemaregistry.client.SchemaRegistryClient;

//import java.io.FileInputStream;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Properties;

public class DataStreamJob {
    public static void main(String[] args) throws Exception {
        // Set up the execution environment
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // Load properties
        Properties properties = new Properties();
        properties.load(Files.newInputStream(Paths.get("/opt/flink/consumer.properties")));

        String bootstrapServers = properties.getProperty("bootstrap.servers");
        String schemaRegistryUrl = properties.getProperty("schema.registry.url");
        String topicName = properties.getProperty("topic.name");
        String groupId = properties.getProperty("group.id");

        // Initialize Schema Registry Client
        CachedSchemaRegistryClient schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 10);
        Schema avroSchema = null;
        try {
            String subject = topicName + "-value"; // Adjust subject naming as per your setup
            avroSchema = new Schema.Parser().parse(schemaRegistryClient.getLatestSchemaMetadata(subject).getSchema());
        } catch (Exception e) {
            throw new RuntimeException("Failed to fetch schema from Schema Registry. Ensure the subject name and registry URL are correct.", e);
        }

        // Set up the Kafka source
        KafkaSource<GenericRecord> source = KafkaSource.<GenericRecord>builder()
                .setBootstrapServers(bootstrapServers)
                .setTopics(topicName)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.latest())
                .setValueOnlyDeserializer(AvroDeserializationSchema.forGeneric(avroSchema))
                .build();

        // Add the source to the environment
        DataStream<GenericRecord> stream = env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");

        // Process the stream (example transformation)
        stream.map(GenericRecord::toString).print();

        // Execute the Flink job
        env.execute("Flink Kafka Avro Consumer");
    }
}

我正在使用 flink 版本 1.20.0,并且我已经设置了我的 pom.xml,其中包含所有更新的依赖项以及所有 kafka 架构注册表连接,但仍然收到此错误。可能的原因是什么。

请帮忙。

apache-kafka apache-flink avro confluent-schema-registry
1个回答
0
投票

java.lang.NoSuchMethodError
通常是版本不匹配造成的。

flink-connector-kafka v3.3.0 是针对

构建的
<kafka.version>3.4.0</kafka.version>
<confluent.version>7.4.4</confluent.version>

我怀疑问题就在那里(但我只是猜测)。

© www.soinside.com 2019 - 2024. All rights reserved.