是否可以仅对键控PCollection中的值应用有状态转换?
例如,假设此PCollection键入邮政编码。这些值是包含user_id键的字典。在这个有状态的DoFn中,我想跟踪每个邮政编码中看到的所有user_id。但是,鉴于邮政编码的绝对数量,将所有邮政编码,user_id对存储在状态中变得很棘手。但是,如果仅对每个密钥应用此有状态DoFn,则无需在状态中显式存储邮政编码。
从Python文档中看,这似乎是不可能的。最好的方法是滥用自定义的CombineFn吗?
谢谢!
我认为您想要的是CombinePerKey。它仅对值或每个键应用CombineFn。
此外,在使用合并时,请考虑减少阶段。
希望此示例对您有所帮助。 (添加了打印件,因此您可以看到“缩小面”以及为什么ifs)
with beam.Pipeline(options=pipeline_options) as p:
keyed_elements = [
(47001, {"user_id": 1, "fake_key":"fake_value"}),
(47001, {"user_id": 2, "fake_key": "fake_value"}),
(47002, {"user_id": 3, "fake_key": "fake_value"}),
(47002, {"user_id": 4, "fake_key": "fake_value"}),
(47003, {"user_id": 5, "fake_key": "fake_value"}),
(47001, {"user_id": 6, "fake_key": "fake_value"}),
(47001, {"user_id": 7, "fake_key": "fake_value"}),
(47001, {"user_id": 8, "fake_key": "fake_value"}),
(47001, {"user_id": 9, "fake_key": "fake_value"}),
(47001, {"user_id": 10, "fake_key": "fake_value"}),
(47001, {"user_id": 11, "fake_key": "fake_value"}),
]
def group_users(elements_values):
#to test paralellism in reduce phase
print(f"ELEMENT: {elements_values}")
final_output = []
for value in elements_values:
if isinstance(value, dict):
final_output.append(value['user_id'])
elif isinstance(value, list):
final_output += value
else:
pass
return final_output
(p | Create(keyed_elements)
| beam.CombinePerKey(group_users)
| Map(print)
)