Flink 升级到 1.18 时单元测试失败

问题描述 投票:0回答:1

我正在将应用程序从 Flink 1.15.2 更新到 1.20,但在此过程中遇到了障碍。我有一个简单的单元测试,它创建一个数据流(长整型值),应用映射函数将值加 1,并将其放入长整型列表中。我从 Flink 文档中复制了这个单元测试,并尝试增量测试它。最初是在 1.15.2,接下来是 1.16.2 等,发现它在 Flink 1.18.1 中出现问题

我尝试通过 sbt 使用 Scala 2.12 进行构建,使用“sbt test”命令运行测试

class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {

  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(2)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }

  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // configure your test environment
    env.setParallelism(2)

    // values are collected in a static variable
    CollectSink.values.clear()

    // create a stream of custom elements and apply transformations
    env.fromElements(1, 21, 22)
      .map(x => x + 1L)
      .addSink(new CollectSink())

    // execute
    env.execute()

    // verify your results
    CollectSink.values should contain allOf(2, 22, 23)
  }
}

class CollectSink extends SinkFunction[Long] {
  override def invoke(value: Long, context: SinkFunction.Context): Unit = {
    CollectSink.values.add(value)
  }
}

object CollectSink {
  // must be static
  val values: util.List[Long] = Collections.synchronizedList(new util.ArrayList())
}

我收到的错误消息:

should incrementValues *** FAILED *** 
[info] org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
[info] at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144) 
[info] at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141) 
[info] at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:646) 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) 
[info] at org.apache.flink.runtime.rpc.pekko.PekkoInvocationHandler.lambda$invokeRpc$1(PekkoInvocationHandler.java:268) 
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) 
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
[info] at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2147) 
[info] ... 
[info] Cause: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster. 
[info] at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97) 
[info] at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:863) 
[info] at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:841) 
[info] at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:510) 
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1773) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[info] at java.base/java.lang.Thread.run(Thread.java:840) 
[info] ... 
[info] Cause: java.util.concurrent.CompletionException: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:315) 
[info] at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:320) 
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1770) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[info] at java.base/java.lang.Thread.run(Thread.java:840) 
[info] ... 
[info] Cause: java.lang.RuntimeException: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321) 
[info] at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114) 
[info] at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1768) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) 
[info] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) 
[info] at java.base/java.lang.Thread.run(Thread.java:840) 
[info] ... 
[info] Cause: java.lang.ClassNotFoundException: org.apache.flink.api.common.ExecutionConfig 
[info] at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:641) 
[info] at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:188) 
[info] at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:525) 
[info] at java.base/java.lang.Class.forName0(Native Method) 
[info] at java.base/java.lang.Class.forName(Class.java:467) 
[info] at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78) 
[info] at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2034) 
[info] at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1898) 
[info] at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2224) 
[info] at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1733)

根据我所做的研究,ClassNotFoundException 错误通常与版本不匹配有关。我在 build.sbt 文件中简化了依赖项:

val flinkVersion = "1.18.1"

val flinkDependencies = Seq(
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink"  % "flink-test-utils"      % flinkVersion % Test, 
  "org.scalatest"    %% "scalatest"             % "3.2.19"     % Test,
  "com.typesafe"      % "config"                % "1.4.3"
)

此外,在查看发行说明时,提到 1.18 是使用 Java 17 进行测试的。我尝试在 Java 11 和 Java 17 环境中运行测试,两者都收到相同的错误。

其他信息这是我正在关注的apache教程https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/testing/

scala apache-flink flink-streaming
1个回答
0
投票

来自 mail-archive - 在测试中获取 java.lang.ClassNotFoundException (Flink 1.18.0) - 2023 年 12 月 3 日

sbt 中的分叉解决了问题

(Test / fork := true)

因此,将该行添加到您的

build.sbt
配置文件中可以解决问题。


我能够使用您提供的代码在本地重现该问题。一旦我添加了该行,测试就成功了。也适用于

1.19.1
1.20.0

  • build.sbt
ThisBuild / scalaVersion := "2.12.20"

val flinkVersion = "1.20.0"

val flinkDependencies: Seq[ModuleID] = Seq(
  "org.apache.flink" %% "flink-streaming-scala" % flinkVersion,
  "org.apache.flink"  % "flink-test-utils"      % flinkVersion % Test,
  "org.scalatest"    %% "scalatest"             % "3.2.19"     % Test,
  "com.typesafe"      % "config"                % "1.4.3"
)

lazy val root = (project in file("."))
  .settings(
    name := "stackoverflow-poc-apache-flink",
    libraryDependencies ++= flinkDependencies,
    Test / fork := true // without this line, the test fail with the issue you reported
  )

  • MappingTest.scala
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration
import org.apache.flink.streaming.api.functions.sink.SinkFunction
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.test.util.MiniClusterWithClientResource
import org.scalatest.BeforeAndAfter
import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers

import java.util.Collections

class MappingTest extends AnyFlatSpec with Matchers with BeforeAndAfter {

  val flinkCluster = new MiniClusterWithClientResource(new MiniClusterResourceConfiguration.Builder()
    .setNumberSlotsPerTaskManager(2)
    .setNumberTaskManagers(1)
    .build)

  before {
    flinkCluster.before()
  }

  after {
    flinkCluster.after()
  }

  "IncrementFlatMapFunction pipeline" should "incrementValues" in {
    val env = StreamExecutionEnvironment.getExecutionEnvironment

    // configure your test environment
    env.setParallelism(2)

    // values are collected in a static variable
    CollectSink.values.clear()

    // create a stream of custom elements and apply transformations
    env.fromElements(1, 21, 22)
      .map(x => x + 1L)
      .addSink(new CollectSink())

    // execute
    env.execute()

    // verify your results
    CollectSink.values should contain allOf(2, 22, 23)
  }
}

class CollectSink extends SinkFunction[Long] {
  override def invoke(value: Long, context: SinkFunction.Context): Unit = {
    CollectSink.values.add(value)
  }
}

object CollectSink {
  // must be static
  val values: java.util.List[Long] = Collections.synchronizedList(new java.util.ArrayList())
}
© www.soinside.com 2019 - 2024. All rights reserved.