侧面输入的有效ParDo设置或start_bundle

问题描述 投票:0回答:1

列表A:2500万哈希列表B:17.5万个哈希值

我想检查列表B中的每个哈希是否存在于列表A中。为此,我有一个ParDo函数,当它不匹配时我产生。这是一个重复数据删除过程。

我如何有效地设置此ParDo,现在我在处理列表B时进行列表A的侧面输入。但是侧面输入不应该转到ParDo的setup()或start_bundle(),因此我存储了查找列表(A )在工作人员中只有一次?

class Checknewrecords(beam.DoFn):
    def process(self, element, hashlist):
        if element['TA_HASH'] not in hashlist:
            yield element
        else:
            pass

如果您有答案,请提供指向文档的链接,因为我找不到适用于Python版本的良好文档。

  • transformed_records是上一个转换的PCollection
  • current_data是来自BigQuery.read的PCollection。>

    new_records =转换后的记录| 'Checknewrecords'>> >> beam.ParDo(Checknewrecords(),pvalue.AsList(current_data))

列表A:25M哈希列表B:175K哈希我想检查列表B中的每个哈希是否存在于列表A中。为此,我有一个ParDo函数,当它不匹配时就产生。这是一个重复数据删除...

python google-cloud-dataflow apache-beam
1个回答
0
投票

我相信pvalue.AsDict是您所需要的,这将为您提供侧面输入的字典样式界面。您可以在Apache Beam Github search上找到一些示例。

© www.soinside.com 2019 - 2024. All rights reserved.