我有一个 Java 21 应用程序,它使用 Apache Flink(版本 1.20.0)依赖项来过滤 kafka 流。
当我尝试执行我的程序时,出现以下错误:
[flink-pekko.actor.default-dispatcher-5] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: Wikimedia Recent Change Events -> Filter -> Sink: Writer -> Sink: Committer (12/20) (b06737fae3a29dd6d5031f24d0e01683_cbc357ccb763df2852fee8c4fc7d55f2_11_0) switched from INITIALIZING to FAILED on 114b44f3-53af-41a6-bc97-a040192436ae @ localhost (dataPort=-1).
java.lang.NoClassDefFoundError: org/apache/flink/shaded/guava30/com/google/common/io/Closer
at org.apache.flink.connector.kafka.sink.KafkaWriter.<init>(KafkaWriter.java:109)
at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:111)
at org.apache.flink.connector.kafka.sink.KafkaSink.createWriter(KafkaSink.java:57)
at org.apache.flink.api.connector.sink2.Sink.createWriter(Sink.java:78)
at org.apache.flink.streaming.runtime.operators.sink.StatefulSinkWriterStateHandler.createWriter(StatefulSinkWriterStateHandler.java:122)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.initializeState(SinkWriterOperator.java:148)
at org.apache.flink.streaming.api.operators.StreamOperatorStateHandler.initializeOperatorState(StreamOperatorStateHandler.java:147)
at org.apache.flink.streaming.api.operators.AbstractStreamOperator.initializeState(AbstractStreamOperator.java:294)
at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:106)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreStateAndGates(StreamTask.java:858)
at org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$restoreInternal$5(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:812)
at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:771)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:970)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:939)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:763)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:575)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.ClassNotFoundException: org.apache.flink.shaded.guava30.com.google.common.io.Closer
... 19 more
我尝试将以下依赖项添加到我的 pom.xml 中作为解决此问题的方法:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
<scope>compile</scope>
</dependency>
但我仍然遇到同样的错误。
我能够通过使用兼容版本的 flink for flink 依赖项和 flink-connector-kafka 来修复它。为此,不需要使用 flink-shaded-guava。
这是我之前的依赖项:
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.class>org.taulin.WikimediaStreamFilterApp</main.class>
<flink.version>1.20.0</flink.version>
<flink.connector.version>1.17.2</flink.connector.version>
<avro.version>1.11.1</avro.version>
<slf4j.version>2.0.16</slf4j.version>
<guice.version>7.0.0</guice.version>
<lombok.version>1.18.34</lombok.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.connector.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-guava</artifactId>
<version>31.1-jre-17.0</version>
<scope>compile</scope>
</dependency>
这些是我现在的依赖项:
<properties>
<maven.compiler.source>21</maven.compiler.source>
<maven.compiler.target>21</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<main.class>org.taulin.WikimediaStreamFilterApp</main.class>
<flink.version>1.17.2</flink.version>
<avro.version>1.11.1</avro.version>
<slf4j.version>2.0.16</slf4j.version>
<guice.version>7.0.0</guice.version>
<lombok.version>1.18.34</lombok.version>
</properties>
<dependencies>
<!-- Flink -->
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java</artifactId>
<version>${flink.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro-confluent-registry</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-avro</artifactId>
<version>${flink.version}</version>
</dependency>