在Apache Beam中为每个键应用有状态DoFn

问题描述 投票:0回答:1

是否可以仅对键控PCollection中的值应用有状态转换?

例如,假设此PCollection键入邮政编码。这些值是包含user_id键的字典。在这个有状态的DoFn中,我想跟踪每个邮政编码中看到的所有user_id。但是,鉴于邮政编码的绝对数量,将所有邮政编码,user_id对存储在状态中变得很棘手。但是,如果仅对每个密钥应用此有状态DoFn,则无需在状态中显式存储邮政编码。

从Python文档中看,这似乎是不可能的。最好的方法是滥用自定义的CombineFn吗?

谢谢!

python google-cloud-dataflow apache-beam
1个回答
0
投票

我认为您想要的是CombinePerKey。它仅对值或每个键应用CombineFn。

此外,在使用合并时,请考虑减少阶段。

希望此示例对您有所帮助。 (添加了打印件,因此您可以看到“缩小面”以及为什么ifs

with beam.Pipeline(options=pipeline_options) as p:

    keyed_elements = [
        (47001, {"user_id": 1, "fake_key":"fake_value"}),
        (47001, {"user_id": 2, "fake_key": "fake_value"}),
        (47002, {"user_id": 3, "fake_key": "fake_value"}),
        (47002, {"user_id": 4, "fake_key": "fake_value"}),
        (47003, {"user_id": 5, "fake_key": "fake_value"}),
        (47001, {"user_id": 6, "fake_key": "fake_value"}),
        (47001, {"user_id": 7, "fake_key": "fake_value"}),
        (47001, {"user_id": 8, "fake_key": "fake_value"}),
        (47001, {"user_id": 9, "fake_key": "fake_value"}),
        (47001, {"user_id": 10, "fake_key": "fake_value"}),
        (47001, {"user_id": 11, "fake_key": "fake_value"}),
    ]

    def group_users(elements_values):

        #to test paralellism in reduce phase
        print(f"ELEMENT: {elements_values}")


        final_output = []
        for value in elements_values:
            if isinstance(value, dict):
                final_output.append(value['user_id'])
            elif isinstance(value, list):
                final_output += value
            else:
                pass

        return final_output

    (p | Create(keyed_elements)
       | beam.CombinePerKey(group_users)
       | Map(print)
     )
© www.soinside.com 2019 - 2024. All rights reserved.