Apache Beam:修复数据流管道中的“PBegin 对象没有属性窗口”错误

问题描述 投票:0回答:1

我正在从事 Apache Beam Dataflow 作业,以在 Firestore 中存储丰富的流数据。如果处理过程中丢失任何事件数据,我会将这些记录推送到 BigQuery 表中。下面,我分享了相关的代码片段。 EnrichEventsWithFirestore 类获取 Firestore 存储所需的数据。我使用标记的输出来捕获失败的记录并将其推送到 BigQuery,如管道中所示。但是,我不断遇到错误: AttributeError:“PBegin”对象没有属性“windowing” 我该如何解决它?

class EnrichEventsWithFirestore(beam.DoFn):
    def __init__(self, database):
        self.database = database
        self.client = None

    def setup(self):
        if not self.client:
            self.client = firestore.Client(project='temp_myself', database=self.database)

    def process(self, element):
        list_ids = []
        for ele in element:
            Id = ele[0]['Id']
            list_ids.append(self.client.document('cust_add', str(Id)))
        docs = self.client.get_all(references=list_ids)
        Id_map = {}
        for doc in docs:
            doc_dict = doc.to_dict()
            if doc_dict:
                Id_map[doc_dict['Id']] = {
                    "c_num": doc_dict.get('c_num', "NA"),
                    "ct_num": doc_dict.get("ct_num", "NA")
                }
        enriched_data = []
        for ele in element:
            tran_data, item_data = ele
            Id = int(tran_data['Id'])
            filename = str(tran_data['filename'])
            tdatetime = tran_data['tdatetime']
           
            if Id in Id_map:
                ct_num = Id_map[Id]['ct_num']
                c_num = Id_map[Id]['c_num']
            else:
                yield beam.pvalue.TaggedOutput('discarded',
                    [{'Id': Id, 'filename': filename, 'tdatetime': tdatetime}]
                )
                continue

            tran_data["ct_num"] = ct_num
            tran_data["c_num"] = c_num
            enriched_data.append((tran_data, item_data))

        yield enriched_data

# Pipeline Code
def run(pipeline_args):
    pipeline_options = PipelineOptions(pipeline_args, save_main_session=True)
    pipeline_options.view_as(StandardOptions).streaming = True
    pipeline = beam.Pipeline(options=pipeline_options)

    if pipeline:
        results = (
            pipeline
            | "Process File Firestore" >> beam.FlatMap(parseForFirestore, campaign__data)
            | "Window and Batch Data Firestore" >> beam.WindowInto(
                beam.window.FixedWindows(5),
                trigger=Repeatedly(AfterAny(AfterCount(100), AfterProcessingTime(5))),
                accumulation_mode=AccumulationMode.DISCARDING,
                allowed_lateness=600
            )
            | "Group Data Firestore" >> beam.GroupBy(lambda s: assign_random_int())
            | "Extract Values Firestore" >> beam.Values()
            | "Enrich Events with Firestore Data" >> beam.ParDo(
                EnrichEventsWithFirestore('temp_myself')
            ).with_outputs('discarded', main='not_discarded')
        )

        results.not_discarded | "Push Data to Firestore" >> beam.ParDo(InsertToFireStoreInBatches('temp_myself'))
        results.discarded | "Write to BQ" >> beam.io.WriteToBigQuery(
            'bq_table',
            create_disposition="CREATE_IF_NEEDED",
            write_disposition="WRITE_APPEND",
            ignore_unknown_columns=True,
            method="STREAMING_INSERTS",
            schema=tab_schema
        )
    
    pipeline.run()


python-3.x google-cloud-firestore google-cloud-dataflow apache-beam apache-beam-io
1个回答
0
投票

您的管道似乎没有定义起点。

每个 apache beam 管道中的第一个转换非常独特,并且期望获得

PBegin
对象,而不是
PCollection
- 就像常规转换一样。

PBegin
是一种特殊的对象,代表管道的开始。您没有定义源 - 例如
beam.Create
beam.ReadFrom*
等。 所以
FlatMap
得到
PBegin
,但期望得到
PCollection
- 所以出现错误。

尝试在

pipeline
beam.FlatMap
转换之间添加一些源。

© www.soinside.com 2019 - 2024. All rights reserved.