apache beam 仅用一列比较两个数据集

问题描述 投票:0回答:1

我的任务是在数据流运行程序中比较 apache beam 中的两个数据集并输出三个阶段,一个在数据集 1 中常见,另一个仅在数据集 1 中,然后仅在数据集 2 中。

我尝试使用 CoGroupByKey 但我不确定它是否可以使用,因为我们只有一个一维列表。我们如何比较这些。我尝试的是合并两个 pcollections,如下所示

import apache_beam as beam


with beam.Pipeline() as p:

  dataset1 = ['data1a', 'data1b', 'data1c', 'data1d']
  dataset2 = ['data2a', 'data2b', 'data1b', 'data2d']
  dataset1_pcoll = p | 'Read Dataset 1' >> beam.Create(dataset1)
  dataset2_pcoll = p | 'Read Dataset 2' >> beam.Create(dataset2)
  

  combined_data = (
            {
                'file1': dataset1_pcoll,
                'file2': dataset2_pcoll,
            }
            | 'CoGroup Files' >> beam.CoGroupByKey()          
        )      
   
   

出现如下错误

 wrapper = lambda x: [fn(*x)]
TypeError: <lambda>() takes 2 positional arguments but 6 were given [while running 'CoGroup Files/CoGroupByKeyImpl/Tag[file2]']

   
python google-cloud-dataflow apache-beam
1个回答
0
投票

我发现了以下作品。不确定是否还有其他更好的方法。我必须复制该元素以作为键映射,并且能够获得所需的输出。现在我可以标记组合输出并输出不同的 pcollection。

import apache_beam as beam


def make_kv_pair(x):
  """ double the record. is this a good practice"""
  return (x, x)
with beam.Pipeline() as p:

  dataset1 = ['data1a', 'data1b', 'data1c', 'data1d']
  dataset2 = ['data2a', 'data2b', 'data1b', 'data2d']
  dataset1_pcoll = (p | 'Read Dataset 1' >> beam.Create(dataset1) |  'doubledata1' >> beam.Map(make_kv_pair))
  dataset2_pcoll = (p | 'Read Dataset 2' >> beam.Create(dataset2)  |  'doubledata2' >> beam.Map(make_kv_pair))
  

  combined_data = (
            {
                'dataset1': dataset1_pcoll,
                'dataset2': dataset2_pcoll,
            }
            | 'Group datasets' >> beam.CoGroupByKey() 
            | 'printing combined' >> beam.Map(print)                     
        )      
   
© www.soinside.com 2019 - 2024. All rights reserved.