我正在尝试在本地托管的 Flink 集群上运行一个简单的 Beam 管道,但在执行此操作时遇到错误。我已经尝试了网上能找到的所有方法。
import apache_beam as beam
from apache_beam.io import ReadFromText
from apache_beam.io import WriteToText
from apache_beam.options.pipeline_options import PipelineOptions
import apache_beam.transforms.window as window
def run():
options = PipelineOptions([
"--runner=FlinkRunner",
"--flink_master=localhost:8081",
"--environment_type=EXTERNAL",
"--environment_config=localhost:50000"
])
with beam.Pipeline(options=options) as p:
(p
| 'Create words' >> beam.Create(['to be or not to be'])
| 'Split words' >> beam.FlatMap(lambda words: words.split(' '))
| 'Write to file' >> WriteToText('testproject.txt')
)
if __name__ == "__main__":
run()
这是我尝试运行的代码,但出现这些错误:
(env) shivakhatri@Shivas-Air beam-starter-python % python3.8 app.py 警告:root:等待 grpc 通道在 localhost:55999 准备就绪。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。警告:root:等待 grpc 通道在 localhost:55999 处准备就绪。错误:root:java.lang.NullPointerException回溯(最近一次调用):文件“app.py”,第23行,在
run()文件“app.py”,第16行,运行(p文件“/Users” /shivakhatri/Documents/work/beam-starter-python/env/lib/python3.8/site-packages/apache_beam/pipeline.py”,第 601 行,在 _exit_ self.result.wait_until_finish() 文件中“ /Users/shivakhatri/Documents/work/beam-starter-python/env/lib/python3.8/site-packages/apache_beam/runners/portability/portable_runner.py”,第 614 行,在 wait_until_finish 中引发 self._runtime_exception RuntimeError:管道BeamApp-shivakhatri-0405201752-c62cb18a_fd177a56-ec3e-448a-a46a-4530de13702b 在 FAILED 状态下失败:java.lang.NullPointerException zsh:分段错误 python3.8 app.py (env) shivakhatri@Shivas-Air beam-starter-python %
这个问题你找到解决办法了吗?