列表A:2500万哈希列表B:17.5万个哈希值
我想检查列表B中的每个哈希是否存在于列表A中。为此,我有一个ParDo函数,当它不匹配时我产生。这是一个重复数据删除过程。
我如何有效地设置此ParDo,现在我在处理列表B时进行列表A的侧面输入。但是侧面输入不应该转到ParDo的setup()或start_bundle(),因此我存储了查找列表(A )在工作人员中只有一次?
class Checknewrecords(beam.DoFn):
def process(self, element, hashlist):
if element['TA_HASH'] not in hashlist:
yield element
else:
pass
如果您有答案,请提供指向文档的链接,因为我找不到适用于Python版本的良好文档。
current_data是来自BigQuery.read的PCollection。>
new_records =转换后的记录| 'Checknewrecords'>> >> beam.ParDo(Checknewrecords(),pvalue.AsList(current_data))
列表A:25M哈希列表B:175K哈希我想检查列表B中的每个哈希是否存在于列表A中。为此,我有一个ParDo函数,当它不匹配时就产生。这是一个重复数据删除...
我相信pvalue.AsDict是您所需要的,这将为您提供侧面输入的字典样式界面。您可以在Apache Beam Github search上找到一些示例。