我使用 apache beam 和 python 在数据流(gcp)上使用它来加载和转换数据巴克斯。
我有一个管道,它分为不同的部分。 第一部分,写入bigquery。 第二部分读取第一步写入的数据。
我使用相同的
p
管道来运行它,因此两个部分将同时运行。我必须在一个管道中实现顺序运行。我尝试添加一些标志来触发第二部分,但由于 WriteToBigQuery 不返回任何内容,并且我无法使其可迭代,所以我无法实现它。
p = beam.Pipeline(options=opts)
part_1 = (
p
| "F1: Read data 1"
>> beam.io.ReadFromText(
entrada, skip_header_lines=True
)
| "F1: Transform 1" >> beam.Map(format_date)
| "F1: Transform 2" >> Map(make_row)
| "F1: Write into BQ"
>> WriteToBigQuery(
output_table,
schema=table_schema,
write_disposition=BigQueryDisposition.WRITE_APPEND,
create_disposition=BigQueryDisposition.CREATE_IF_NEEDED,
additional_bq_parameters={
"timePartitioning": {"type": "DAY"},
"clustering": {"fields": ["programcode"]},
},
custom_gcs_temp_location=temp_location
)
)
执行此操作后,我读取写入的数据(我使用的查询还包括除此之外的其他表),这样:
part_2 = (
p
| "F2: Read from BigQuery from first step"
>> beam.io.ReadFromBigQuery(
query=query_raw,
use_standard_sql=True,
gcs_location=temp_location,
project=project_id
)
)
最后运行定义的管道:
result = p.run()
result.wait_until_finish()
如果我将part_1添加为part_2中的输入,如下所示:
part_2 = (
part_1
| "F2: Read from BigQuery"...
输出如下,因为part_1输出:
AttributeError: Error trying to access nonexistent attribute `0` in write result. Please see __documentation__ for available attributes.
任何实现这一目标的想法或例子都欢迎讨论。
def run_pipeline(project, region):
options = PipelineOptions(
runner = "DataflowRunner"
, project = project
, region = region
)
with beam.Pipeline(options=options) as p1:
(
p1 | "Create Pipeline" >> beam.Create(["This","is","my","main","Apache Beam","pipeline"]) | "Printing Elements" >> beam.Map(print)
)
with beam.Pipeline(options=options) as p2:
(
p2 | "SecondPipeline" >> beam.Create([]) | "Print" >> beam.Map(customFunction)
)
一旦 p1 完成,p2 将运行