我正在将应用程序从 Flink 1.15.2 更新到 1.20,但在此过程中遇到了障碍。我有一个简单的单元测试,它创建一个数据流(长整型值),应用映射函数将值加 1,并将其放入长整型列表中。我从 Flink 文档中复制了这个单元测试,并尝试增量测试它。最初是在 1.15.2,接下来是 1.16.2 等,发现它在 Flink 1.18.1 中出现问题
我尝试通过 sbt 使用 Scala 2.12 进行构建,使用“sbt test”命令运行测试
class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"IncrementFlatMapFunction pipeline" should "incrementValues" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// configure your test environment
env.setParallelism(2)
// values are collected in a static variable
CollectSink.values.clear()
// create a stream of custom elements and apply transformations
env.fromElements(1, 21, 22)
.map(x => x + 1L)
.addSink(new CollectSink())
// execute
env.execute()
// verify your results
CollectSink.values should contain allOf(2, 22, 23)
}
}
class CollectSink extends SinkFunction[Long] {
override def invoke(value: Long, context: SinkFunction.Context): Unit = {
CollectSink.values.add(value)
}
}
object CollectSink {
// must be static
val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())
}
我收到的错误消息:
should incrementValues *** FAILED ***
[info] org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
[info] at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
[info] at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
[info] at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646)
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
[info] at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268)
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147)
[info] ...
[info] Cause: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
[info] at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863)
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841)
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315)
[info] at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
[info] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
[info] at java.base/java.lang.Thread.run(Thread.java:840)
[info] ...
[info] Cause: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641)
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188)
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525)
[info] at java.base/java.lang.Class.forName0(Native Method)
[info] at java.base/java.lang.Class.forName(Class.java:467)
[info] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
[info] at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034)
[info] at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898)
[info] at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224)
[info] at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)
根据我所做的研究,ClassNotFoundException 错误通常与版本不匹配有关。我在 build.sbt 文件中简化了依赖项:
val flinkVersion = "1.18.1"
val flinkDependencies = Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"com.typesafe" % "config" % "1.4.3"
)
此外,在查看发行说明时,提到 1.18 是使用 Java 17 进行测试的。我尝试在 Java 11 和 Java 17 环境中运行测试,两者都收到相同的错误。
其他信息这是我正在关注的apache教程https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/
来自 mail-archive - 在测试中获取 java.lang.ClassNotFoundException (Flink 1.18.0) - 2023 年 12 月 3 日
sbt 中的分叉解决了问题
。(Test / fork := true)
因此,将该行添加到您的
build.sbt
配置文件中可以解决问题。
我能够使用您提供的代码在本地重现该问题。一旦我添加了该行,测试就成功了。也适用于
1.19.1
和 1.20.0
。
build.sbt
ThisBuild / scalaVersion := "2.12.20"
val flinkVersion = "1.20.0"
val flinkDependencies: Seq[ModuleID] = Seq(
"org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
"org.apache.flink" % "flink-test-utils" % flinkVersion % Test,
"org.scalatest" %% "scalatest" % "3.2.19" % Test,
"com.typesafe" % "config" % "1.4.3"
)
lazy val root = (project in file("."))
.settings(
name := "stackoverflow-poc-apache-flink",
libraryDependencies ++= flinkDependencies,
Test / fork := true // without this line, the test fail with the issue you reported
)
MappingTest.scala
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import java.util.Collections
class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {
val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
.setNumberSlotsPerTaskManager(2)
.setNumberTaskManagers(1)
.build)
before {
flinkCluster.before()
}
after {
flinkCluster.after()
}
"IncrementFlatMapFunction pipeline" should "incrementValues" in {
val env = StreamExecutionEnvironment.getExecutionEnvironment
// configure your test environment
env.setParallelism(2)
// values are collected in a static variable
CollectSink.values.clear()
// create a stream of custom elements and apply transformations
env.fromElements(1, 21, 22)
.map(x => x + 1L)
.addSink(new CollectSink())
// execute
env.execute()
// verify your results
CollectSink.values should contain allOf(2, 22, 23)
}
}
class CollectSink extends SinkFunction[Long] {
override def invoke(value: Long, context: SinkFunction.Context): Unit = {
CollectSink.values.add(value)
}
}
object CollectSink {
// must be static
val values: java.util.List[Long] = Collections.synchronizedList(new java.util.ArrayList())
}