我正在尝试使用以下命令运行数据流管道:
!python3 ~/pipelines/Beam/pipeline.py \
--project='project_id' \
--region='region' \
--dataset_id='dataset_id' \
--ingest_table_name='titanic_train' \
--bucket='bucket_id' \
--temp_location='gs://bucket_id/dataflow/temp' \
--runner=DataflowRunner
问题在于:
虽然管道成功地将文件“pipeline.pb”写入temp_location,例如:
gs://bucket_id/dataflow/temp/my-bq-pipeline-1697443447056594634.1697443451.965390/my-bq-pipeline-1697443447056594634.1697443458.757779/pipeline.pb
,
当它随后尝试从写入它的同一位置读取它时,它会抛出以下错误:
INFO:apache_beam.runners.dataflow.dataflow_runner: JOB_MESSAGE_ERROR: Unable to open file: gs://bucketId/dataflow/temp/my-bq-pipeline-1697443447056594634.1697443451.965390/my-bq-pipeline-1697443447056594634.1697443458.757779/pipeline.pb.
我认为相同的数据流管道应该能够读取它最初写入的文件。
我测试过我可以从笔记本访问该文件:
!gsutil cp gs://bucket_id/dataflow/temp/my-bq-pipeline-1697443447056594634.1697443451.965390/my-bq-pipeline-1697443447056594634.1697443458.757779/pipeline.pb ./pipeline.pb
此外,我可以使用 DirrectRunner 运行相同的管道,不会出现任何问题。 知道可能是什么问题吗?
假设您能够写入,您也应该能够读取,您可以检查一下 Dataflow 服务帐户是否具有该云存储桶所需的权限。默认情况下,Dataflow 使用计算引擎服务帐户,在启动管道时显式传递服务帐户非常重要,请在启动时使用以下标志
--service_account_email=my-service-account-name@<project-id>.iam.gserviceaccount.com