我有简单的数据流管道并试图从云外壳执行,
码:
from __future__ import print_function
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
with beam.Pipeline(options=PipelineOptions()) as p:
lines = p | 'Read' >> beam.io.ReadFromText('test.csv')
lines | 'Write' >> beam.io.WriteToText('gs://bucket/output_20193003', file_name_suffix='.csv')
result = p.run()
result.wait_until_finish()
用于执行的命令:
python -m simple_pipeline --runner DataflowRunner --project myproject --staging_location gs://bucket/staging --temp_location gs://bucket/temp
观察:从Cloud shell执行时我遇到以下错误,
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 859, in run_stages
pcoll_buffers, safe_coders).process_bundle.metrics
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 970, in run_stage
self._progress_frequency).process_bundle(data_input, data_output)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1174, in process_bundle
result_future = self._controller.control_handler.push(process_bundle)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/portability/fn_api_runner.py", line 1054, in push
response = self.worker.do_instruction(request)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 208, in do_instruction
request.instruction_id)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/worker/sdk_worker.py", line 230, in process_bundle
processor.process_bundle(instruction_id)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/runners/worker/bundle_processor.py", line 301, in process_bundle
op.finish()
File "apache_beam/runners/worker/operations.py", line 398, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 399, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/worker/operations.py", line 400, in apache_beam.runners.worker.operations.DoOperation.finish
File "apache_beam/runners/common.py", line 598, in apache_beam.runners.common.DoFnRunner.finish
File "apache_beam/runners/common.py", line 589, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 618, in apache_beam.runners.common.DoFnRunner._reraise_augmented
File "apache_beam/runners/common.py", line 587, in apache_beam.runners.common.DoFnRunner._invoke_bundle_method
File "apache_beam/runners/common.py", line 299, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 302, in apache_beam.runners.common.DoFnInvoker.invoke_finish_bundle
File "apache_beam/runners/common.py", line 693, in apache_beam.runners.common._OutputProcessor.finish_bundle_outputs
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/iobase.py", line 1005, in finish_bundle
yield WindowedValue(self.writer.close(), window.MAX_TIMESTAMP,
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/filebasedsink.py", line 388, in close
self.sink.close(self.temp_handle)
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/filebasedsink.py", line 148, in close
file_handle.close()
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/filesystemio.py", line 201, in close
self._uploader.finish()
File "/home/user/beam-env/local/lib/python2.7/site-packages/apache_beam/io/gcp/gcsio.py", line 553, in finish
raise self._upload_thread.last_error # pylint: disable=raising-bad-type
RuntimeError: SSLHandshakeError: [SSL: CERTIFICATE_VERIFY_FAILED] certificate verify failed (_ssl.c:661) [while running 'Write/Write/WriteImpl/WriteBundles']
注意:我只从Cloud Shell看到此错误,从本地计算机执行时,相同的代码和命令工作正常 - 使用SDK或PyCharm IDE。
我的云外壳有什么问题?怎么解决?请建议。
或者在云外壳中试试这个?
virtualenv --python=/usr/bin/python2 run_pipeline
. ./run_pipeline/bin/activate
pip install apache-beam[gcp]
python -m simple_pipeline --runner DataflowRunner --project myproject --staging_location gs://bucket/staging --temp_location gs://bucket/temp
升级到最新版本后,我可以启动数据流作业。
pip install apache-beam[gcp]