我有一个向客户端公开 GUI 的项目,客户端可以使用拖放功能并可以配置 Flink 源和接收器。 GUI组件支持多种类型的源和接收器,如Kafka、文件、数据库等,也支持多种类型的序列化器/反序列化器。我们在 GUI 上有一些预定义的映射函数,用户可以在其中提供对象的 JSON 模式。在后端,我们根据GUI配置编写java代码,将其转换为JAR并渲染到Flink。
这在 Unix 机器上运行良好。但是,在使用 MiniClusterWithClientResource 编写集成测试时,我面临与基于 GUI 配置动态生成的 java 代码相关的问题。
我尝试使用 URLClassLoader
注册我的类和 jar URLClassLoader classLoader = URLClassLoader.newInstance(new URL[] { new File(jarName).toURI().toURL() });
Thread.currentThread().setContextClassLoader(classLoader);
我还在pom中的maven-surefire-plugin配置下注册了动态生成的jar的路径
<additionalClasspathElements
<additionalClasspathElement>${project.build.directory}</additionalClasspathElement>
</additionalClasspathElements>
但是仍然MiniClusterWithClientResource类加载器无法找到我动态生成的类/jar。
注意:此解决方案适用于 Windows,但不适用于 Unix 中的 CI 机器。
我收到如下所示的异常:
org.apache.flink.runtime.client.JobExecutionException: Job execution failed.
at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:144)
at org.apache.flink.runtime.minicluster.MiniClusterJobClient.lambda$getJobExecutionResult$3(MiniClusterJobClient.java:141)
at java.base/java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:642)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2075)
at org.apache.flink.runtime.rpc.akka.AkkaInvocationHandler.lambda$invokeRpc$1(AkkaInvocationHandler.java:268)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2075)
at org.apache.flink.util.concurrent.FutureUtils.doForward(FutureUtils.java:1300)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$null$1(ClassLoadingUtils.java:93)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.runWithContextClassLoader(ClassLoadingUtils.java:68)
at org.apache.flink.runtime.concurrent.akka.ClassLoadingUtils.lambda$guardCompletionWithContextClassLoader$2(ClassLoadingUtils.java:92)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture.complete(CompletableFuture.java:2075)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$1.onComplete(AkkaFutureUtils.java:47)
at akka.dispatch.OnComplete.internal(Future.scala:300)
at akka.dispatch.OnComplete.internal(Future.scala:297)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:224)
at akka.dispatch.japi$CallbackBridge.apply(Future.scala:221)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at org.apache.flink.runtime.concurrent.akka.AkkaFutureUtils$DirectExecutionContext.execute(AkkaFutureUtils.java:65)
at scala.concurrent.impl.CallbackRunnable.executeWithValue(Promise.scala:68)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.$anonfun$tryComplete$1$adapted(Promise.scala:284)
at scala.concurrent.impl.Promise$DefaultPromise.tryComplete(Promise.scala:284)
at akka.pattern.PromiseActorRef.$bang(AskSupport.scala:621)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:24)
at akka.pattern.PipeToSupport$PipeableFuture$$anonfun$pipeTo$1.applyOrElse(PipeToSupport.scala:23)
at scala.concurrent.Future.$anonfun$andThen$1(Future.scala:532)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:29)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:29)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:60)
at akka.dispatch.BatchingExecutor$AbstractBatch.processBatch(BatchingExecutor.scala:63)
at akka.dispatch.BatchingExecutor$BlockableBatch.$anonfun$run$1(BatchingExecutor.scala:100)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:12)
at scala.concurrent.BlockContext$.withBlockContext(BlockContext.scala:81)
at akka.dispatch.BatchingExecutor$BlockableBatch.run(BatchingExecutor.scala:100)
at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:49)
at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(ForkJoinExecutorConfigurator.scala:48)
at java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)
at java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)
at java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)
at java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)
at java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)
Caused by: org.apache.flink.runtime.client.JobInitializationException: Could not start the JobMaster.
at org.apache.flink.runtime.jobmaster.DefaultJobMasterServiceProcess.lambda$new$0(DefaultJobMasterServiceProcess.java:97)
at java.base/java.util.concurrent.CompletableFuture.uniWhenComplete(CompletableFuture.java:859)
at java.base/java.util.concurrent.CompletableFuture$UniWhenComplete.tryFire(CompletableFuture.java:837)
at java.base/java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:506)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1705)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: java.util.concurrent.CompletionException: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Kafka_Collector -> Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at java.base/java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:314)
at java.base/java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:319)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1702)
... 3 more
Caused by: java.lang.RuntimeException: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Kafka_Collector -> Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at org.apache.flink.util.ExceptionUtils.rethrow(ExceptionUtils.java:321)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:114)
at java.base/java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1700)
... 3 more
Caused by: org.apache.flink.runtime.JobException: Cannot instantiate the coordinator for operator Source: Kafka_Collector -> Filter -> Kafka_Distributor: Writer -> Kafka_Distributor: Committer
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:229)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertex(DefaultExecutionGraph.java:901)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.initializeJobVertices(DefaultExecutionGraph.java:891)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:848)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraph.attachJobGraph(DefaultExecutionGraph.java:830)
at org.apache.flink.runtime.executiongraph.DefaultExecutionGraphBuilder.buildGraph(DefaultExecutionGraphBuilder.java:203)
at org.apache.flink.runtime.scheduler.DefaultExecutionGraphFactory.createAndRestoreExecutionGraph(DefaultExecutionGraphFactory.java:156)
[mini-cluster-io-thread-2] INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Job 155cebff305d242ab4b9750b0730f0a4 has been registered for cleanup in the JobResultStore after reaching a terminal state.
at org.apache.flink.runtime.scheduler.SchedulerBase.createAndRestoreExecutionGraph(SchedulerBase.java:361)
at org.apache.flink.runtime.scheduler.SchedulerBase.<init>(SchedulerBase.java:206)
at org.apache.flink.runtime.scheduler.DefaultScheduler.<init>(DefaultScheduler.java:134)
at org.apache.flink.runtime.scheduler.DefaultSchedulerFactory.createInstance(DefaultSchedulerFactory.java:152)
at org.apache.flink.runtime.jobmaster.DefaultSlotPoolServiceSchedulerFactory.createScheduler(DefaultSlotPoolServiceSchedulerFactory.java:119)
at org.apache.flink.runtime.jobmaster.JobMaster.createScheduler(JobMaster.java:369)
at org.apache.flink.runtime.jobmaster.JobMaster.<init>(JobMaster.java:346)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.internalCreateJobMasterService(DefaultJobMasterServiceFactory.java:123)
at org.apache.flink.runtime.jobmaster.factories.DefaultJobMasterServiceFactory.lambda$createJobMasterService$0(DefaultJobMasterServiceFactory.java:95)
at org.apache.flink.util.function.FunctionUtils.lambda$uncheckedSupplier$4(FunctionUtils.java:112)
... 4 more
Caused by: java.lang.ClassNotFoundException: com.Employee.AverageAge
at java.base/java.net.URLClassLoader.findClass(URLClassLoader.java:476)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:593)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClassWithoutExceptionHandling(FlinkUserCodeClassLoader.java:67)
at org.apache.flink.util.ChildFirstClassLoader.loadClassWithoutExceptionHandling(ChildFirstClassLoader.java:74)
at org.apache.flink.util.FlinkUserCodeClassLoader.loadClass(FlinkUserCodeClassLoader.java:51)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:526)
at org.apache.flink.util.FlinkUserCodeClassLoaders$SafetyNetWrapperClassLoader.loadClass(FlinkUserCodeClassLoaders.java:192)
at java.base/java.lang.Class.forName0(Native Method)
at java.base/java.lang.Class.forName(Class.java:398)
at org.apache.flink.util.InstantiationUtil$ClassLoaderObjectInputStream.resolveClass(InstantiationUtil.java:78)
at java.base/java.io.ObjectInputStream.readNonProxyDesc(ObjectInputStream.java:2025)
at java.base/java.io.ObjectInputStream.readClassDesc(ObjectInputStream.java:1892)
at java.base/java.io.ObjectInputStream.readClass(ObjectInputStream.java:1855)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1680)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2518)
at java.base/java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2412)
at java.base/java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2250)
at java.base/java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1709)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:500)
at java.base/java.io.ObjectInputStream.readObject(ObjectInputStream.java:458)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:617)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:602)
at org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:589)
at org.apache.flink.util.SerializedValue.deserializeValue(SerializedValue.java:67)
at org.apache.flink.runtime.operators.coordination.OperatorCoordinatorHolder.create(OperatorCoordinatorHolder.java:488)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.createOperatorCoordinatorHolder(ExecutionJobVertex.java:286)
at org.apache.flink.runtime.executiongraph.ExecutionJobVertex.initialize(ExecutionJobVertex.java:223)
... 20 more
所有配置都可以在conf/目录下的Flink配置文件中设置(参见Flink配置文件)。
Flink 进程启动时会解析和评估配置。更改配置文件需要重新启动相关进程。
开箱即用的配置将使用您的默认 Java 安装。如果您想手动覆盖要使用的 Java 运行时,可以手动设置环境变量 JAVA_HOME 或 Flink 配置文件中的配置键 env.java.home。请注意,配置键 env.java.home 必须在配置文件中以扁平格式(即一行键值格式)指定。
您可以通过定义 FLINK_CONF_DIR 环境变量来指定不同的配置目录位置。对于提供非会话部署的资源提供程序,您可以通过这种方式指定每个作业的配置。从 Flink 发行版中复制 conf 目录,并根据每个作业修改设置。请注意,Docker 或独立 Kubernetes 部署不支持此功能。在基于 Docker 的部署中,您可以使用 FLINK_PROPERTIES 环境变量来传递配置值。
在会话集群上,提供的配置将仅用于配置执行参数,例如影响作业的配置参数,而不是底层集群。