我正在尝试使用 KafkaIO 与 Flink Runner for Beam 版本 2.45.0 一起读取 我看到以下相同的问题:
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: No translator known for org.apache.beam.runners.core.construction.SplittableParDo$PrimitiveUnboundedRead
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:372)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:222)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:114)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:841)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:240)
at org.apache.flink.client.cli.CliFrontend.parseAndRun(CliFrontend.java:1085)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:1163)
at org.apache.flink.runtime.security.contexts.NoOpSecurityContext.runSecured(NoOpSecurityContext.java:28)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:1163)
Caused by: java.lang.IllegalStateException: No translator known for org.apache.beam.runners.core.construction.SplittableParDo$PrimitiveUnboundedRead
at org.apache.beam.runners.core.construction.PTransformTranslation.urnForTransform(PTransformTranslation.java:283)
at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.visitPrimitiveTransform(FlinkStreamingPipelineTranslator.java:135)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:593)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.visit(TransformHierarchy.java:585)
at org.apache.beam.sdk.runners.TransformHierarchy$Node.access$500(TransformHierarchy.java:240)
at org.apache.beam.sdk.runners.TransformHierarchy.visit(TransformHierarchy.java:214)
at org.apache.beam.sdk.Pipeline.traverseTopologically(Pipeline.java:469)
at org.apache.beam.runners.flink.FlinkPipelineTranslator.translate(FlinkPipelineTranslator.java:38)
at org.apache.beam.runners.flink.FlinkStreamingPipelineTranslator.translate(FlinkStreamingPipelineTranslator.java:92)
at org.apache.beam.runners.flink.FlinkPipelineExecutionEnvironment.translate(FlinkPipelineExecutionEnvironment.java:115)
at org.apache.beam.runners.flink.FlinkRunner.run(FlinkRunner.java:104)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:323)
at org.apache.beam.sdk.Pipeline.run(Pipeline.java:309)
at BeamPipelineKafka.main(BeamPipelineKafka.java:51)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:355)
... 8 more
我有什么遗漏的吗?或者也许一些配置不使用这个 SpilttableParDo?
按以下方式阅读:
PipelineOptions options = PipelineOptionsFactory.create();
options.setRunner(FlinkRunner.class);
Pipeline pipeline = Pipeline.create(options);
pipeline
// Read from the input Kafka topic
.apply("Read from Kafka", KafkaIO.<String, String>read()
.withBootstrapServers("localhost:9092")
.withTopic("input-topic")
.withKeyDeserializer(StringDeserializer.class)
.withValueDeserializer(StringDeserializer.class))
.apply ....
错误消息表明 Flink 运行器没有用于转换的转换器,这使得 Beam 无法将转换转换为 Flink 可以执行的形式。一个可能的解决方案是使用具有用于转换的转换器的不同运行器,例如数据流
有任何与 pubsubio 面临同样问题的解决方案吗