无法运行数据流管道 数据流管道失败。状态:失败,错误:无法打开文件:gs://

问题描述 投票:0回答:1

我在网上找到了一个演示,我正在尝试复制它。然而每当我跑步

 python pipeline.py --streaming --runner DataflowRunner \
  --project <PROJECT> \
  --temp_location gs://tweeps-stream/temp \
  --staging_location gs://tweeps-stream/staging \
  --region us-west1 \
  --job_name tweeps

我收到以下错误

apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Unable to open file: gs://tweeps-stream/staging/tweep.1697803634.304288/pipeline.pb.

我正在使用 GCP 云控制台,并且我为自己提供了数据流、bigquery 和 gcs 的管理员权限。

这是我尝试运行的代码

from apache_beam.options.pipeline_options import PipelineOptions
from sys import argv
import apache_beam as beam
import argparse

PROJECT_ID = 'gcp-project'
SUBSCRIPTION = 'projects/' + PROJECT_ID + '/subscriptions/tweeps'
SCHEMA = 'created_at:TIMESTAMP,tweep_id:STRING,text:STRING,user:STRING,flagged:BOOLEAN'

def parse_pubsub(data):
    # use the json library to convert the datafrom pubsub to a python dictionary object
    # makes it much easier to manipulate and Apache Beam also supports python dictionaries directly to BigQuery.
    import json
    return json.loads(data)


def fix_timestamp(data):
    import datetime
    d = datetime.datetime.strptime(data['created_at'], "%d/%b/%Y:%H:%M:%S")
    data['created_at'] = d.strftime("%Y-%m-%d %H:%M:%S")
    return data


def check_tweep(data):
    BAD_WORDS = ['attack', 'drug', 'gun']
    data['flagged'] = False
    for word in BAD_WORDS:
        if word in data['text']:
            data['flagged'] = True
    return data


if __name__ == '__main__':

    parser = argparse.ArgumentParser()
    known_args = parser.parse_known_args(argv)

    p = beam.Pipeline(options=PipelineOptions())

    (p | 'ReadData' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION).with_output_types(bytes)
       | 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
       | 'PubSubToJSON' >> beam.Map(parse_pubsub)
       | 'FixTimestamp' >> beam.Map(fix_timestamp)
       | 'CheckTweep' >> beam.Map(check_tweep)
       | 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
           '{0}:tweeper.tweeps'.format(PROJECT_ID),
           schema=SCHEMA,
           write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
    result = p.run()
    result.wait_until_finish()
 

我的问题是:

我不明白为什么当我拥有正确的权限并运行 python 应用程序时会出现此错误

python pipeline.py --streaming

如有任何建议,我们将不胜感激。

google-cloud-platform google-cloud-dataflow
1个回答
0
投票

请检查https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions。 Dataflow 使用两个服务帐户来管理安全性和权限。

© www.soinside.com 2019 - 2024. All rights reserved.