我正在 Google Dataflow 上使用 Apache Beam,并且正在调用 3 个函数
| "Unnest 1" >> beam.Map(lambda record: dict_level1(record))
| "Unnest 2" >> beam.Map(lambda record: unnest_dict(record))
| "Unnest 3" >> beam.Map(lambda record: dict_level0(record))
但是当我在数据流中运行作业时,出现名称未定义的错误。
这是我的代码
import apache_beam as beam
import os
from apache_beam.options.pipeline_options import PipelineOptions
#este me crea el output y me genera el template
pipeline_options = {
'project': 'c3t-tango-dev',
'runner': 'DataflowRunner',
'region': 'us-central1', # Asegúrate de especificar la región correctamente
'staging_location': 'gs://dario-dev-gcs/dataflow-course/staging',
'template_location': 'gs://dario-dev-gcs/dataflow-course/templates/batch_job_df_gcs_flights4'
}
pipeline_options = PipelineOptions.from_dictionary(pipeline_options)
table_schema = 'airport:STRING, list_delayed_num:INTEGER, list_delayed_time:INTEGER'
table = 'c3t-tango-dev:dataflow.flights_aggr'
class Filter(beam.DoFn):
def process(self, record):
if int(record[8]) > 0:
return [record]
def dict_level1(record):
dict_ = {}
dict_['airport'] = record[0]
dict_['list'] = record[1]
return (dict_)
def unnest_dict(record):
def expand(key, value):
if isinstance(value, dict):
return [(key + '_' + k, v) for k, v in unnest_dict(value).items()]
else:
return [(key, value)]
items = [item for k, v in record.items() for item in expand(k, v)]
return dict(items)
def dict_level0(record):
#print("Record in dict_level0:", record)
dict_ = {}
dict_['airport'] = record['airport']
dict_['list_Delayed_num'] = record['list_Delayed_num'][0]
dict_['list_Delayed_time'] = record['list_Delayed_time'][0]
return (dict_)
with beam.Pipeline(options=pipeline_options) as p1:
serviceAccount = "./composer/dags/c3t-tango-dev-591728f351ee.json"
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = serviceAccount
Delayed_time = (
p1
| "Import Data time" >> beam.io.ReadFromText("gs://dario-dev-gcs/dataflow-course/input/voos_sample.csv",
skip_header_lines=1)
| "Split by comma time" >> beam.Map(lambda record: record.split(','))
| "Filter Delays time" >> beam.ParDo(Filter())
| "Create a key-value time" >> beam.Map(lambda record: (record[4], int(record[8])))
| "Sum by key time" >> beam.CombinePerKey(sum)
)
Delayed_num = (
p1
| "Import Data" >> beam.io.ReadFromText("gs://dario-dev-gcs/dataflow-course/input/voos_sample.csv",
skip_header_lines=1)
| "Split by comma" >> beam.Map(lambda record: record.split(','))
| "Filter Delays" >> beam.ParDo(Filter())
| "Create a key-value" >> beam.Map(lambda record: (record[4], int(record[8])))
| "Count by key" >> beam.combiners.Count.PerKey()
)
Delay_table = (
{'Delayed_num': Delayed_num, 'Delayed_time': Delayed_time}
| "Group By" >> beam.CoGroupByKey()
| "Unnest 1" >> beam.Map(lambda record: dict_level1(record))
| "Unnest 2" >> beam.Map(lambda record: unnest_dict(record))
| "Unnest 3" >> beam.Map(lambda record: dict_level0(record))
#| beam.Map(print)
| "Write to BQ" >> beam.io.WriteToBigQuery(
table,
schema=table_schema,
write_disposition=beam.io.BigQueryDisposition.WRITE_APPEND,
create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
custom_gcs_temp_location="gs://dario-dev-gcs/dataflow-course/staging")
)
p1.run()
我运行此代码,在 gcs 中生成一个模板,然后我使用自定义模板将模板上传到数据流并指向该模板,但在运行时出现此错误
文件“/Users/dario/Repo-c3tech/c3t-tango/./composer/dags/gcp_to_bq_table.py”,第 76 行,位于 NameError:名称“dict_level1”未定义
设置
--save_main_session pipeline option to True
后上述错误已解决。
错误:
File "/Users/dario/Repo-c3tech/c3t-tango/./composer/dags/gcp_to_bq_table.py", line 76, in NameError: name 'dict_level1' is not defined
根据此文档,当您在本地执行时,例如使用DirectRunner执行时,不会发生此错误。如果您的 DoFns 使用全局命名空间中的值,而这些值在数据流工作线程上不可用,则会出现此错误。要解决此问题,请将 --save_main_session 管道选项设置为 True。