我在网上找到了一个演示,我正在尝试复制它。然而每当我跑步
python pipeline.py --streaming --runner DataflowRunner \
--project <PROJECT> \
--temp_location gs://tweeps-stream/temp \
--staging_location gs://tweeps-stream/staging \
--region us-west1 \
--job_name tweeps
我收到以下错误
apache_beam.runners.dataflow.dataflow_runner.DataflowRuntimeException: Dataflow pipeline failed. State: FAILED, Error:
Unable to open file: gs://tweeps-stream/staging/tweep.1697803634.304288/pipeline.pb.
我正在使用 GCP 云控制台,并且我为自己提供了数据流、bigquery 和 gcs 的管理员权限。
这是我尝试运行的代码
from apache_beam.options.pipeline_options import PipelineOptions
from sys import argv
import apache_beam as beam
import argparse
PROJECT_ID = 'gcp-project'
SUBSCRIPTION = 'projects/' + PROJECT_ID + '/subscriptions/tweeps'
SCHEMA = 'created_at:TIMESTAMP,tweep_id:STRING,text:STRING,user:STRING,flagged:BOOLEAN'
def parse_pubsub(data):
# use the json library to convert the datafrom pubsub to a python dictionary object
# makes it much easier to manipulate and Apache Beam also supports python dictionaries directly to BigQuery.
import json
return json.loads(data)
def fix_timestamp(data):
import datetime
d = datetime.datetime.strptime(data['created_at'], "%d/%b/%Y:%H:%M:%S")
data['created_at'] = d.strftime("%Y-%m-%d %H:%M:%S")
return data
def check_tweep(data):
BAD_WORDS = ['attack', 'drug', 'gun']
data['flagged'] = False
for word in BAD_WORDS:
if word in data['text']:
data['flagged'] = True
return data
if __name__ == '__main__':
parser = argparse.ArgumentParser()
known_args = parser.parse_known_args(argv)
p = beam.Pipeline(options=PipelineOptions())
(p | 'ReadData' >> beam.io.ReadFromPubSub(subscription=SUBSCRIPTION).with_output_types(bytes)
| 'Decode' >> beam.Map(lambda x: x.decode('utf-8'))
| 'PubSubToJSON' >> beam.Map(parse_pubsub)
| 'FixTimestamp' >> beam.Map(fix_timestamp)
| 'CheckTweep' >> beam.Map(check_tweep)
| 'WriteToBigQuery' >> beam.io.WriteToBigQuery(
'{0}:tweeper.tweeps'.format(PROJECT_ID),
schema=SCHEMA,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND))
result = p.run()
result.wait_until_finish()
我的问题是:
我不明白为什么当我拥有正确的权限并运行 python 应用程序时会出现此错误
python pipeline.py --streaming
如有任何建议,我们将不胜感激。
请检查https://cloud.google.com/dataflow/docs/concepts/security-and-permissions#permissions。 Dataflow 使用两个服务帐户来管理安全性和权限。