将PCollection与apache_beam合并

问题描述 投票:0回答:1

我正在尝试使用apache_beam运行管道(最后将到达DataFlow)。

管道应如下所示:

enter image description here

[我格式化来自PubSub的数据,将原始结果写入Firestore,运行ML模型,在获得ML模型的结果后,我想使用从第一次写入FS时获得的ID更新Firestore。 >

管道代码通常如下所示:

with beam.Pipeline(options=options) as p:
    # read and format
    formated_msgs = (
        p
        | "Read from PubSub" >> LoadPubSubData(known_args.topic)
    )

    # write the raw results to firestore
    write_results = (
        formated_msgs
        | "Write to FS" >> beam.ParDo(WriteToFS())
        | "Key FS" >> beam.Map(lambda fs: (fs["record_uuid"], fs))
    )

    # Run the ML model
    ml_results = (
        formated_msgs
        | "ML" >> ML()
        | "Key ML" >> beam.Map(lambda row: (row["record_uuid"], row))
    )

    # Merge by key and update - HERE IS THE PROBLEM
    (
        (write_results, ml_results) # I want to have the data from both merged by the key at this point
        | "group" >> beam.CoGroupByKey()
        | "log" >> beam.ParDo(LogFn())
    )

我尝试了很多方法,但似乎找不到正确的方法。有什么想法吗?

-更新1 ---

问题是在日志行上我什么都没得到。有时,我什至在操作上超时。可能需要注意的是,我一开始是从PubSub传输数据的。

我正在尝试使用apache_beam运行管道(最后将到达DataFlow)。管道应如下所示:我格式化来自PubSub的数据,将原始结果写入Firestore,然后运行...

python-3.x google-cloud-dataflow apache-beam
1个回答
0
投票

好,所以我终于明白了。我想唯一丢失的是窗口化,因为我正在流传输数据。

© www.soinside.com 2019 - 2024. All rights reserved.