我的任务是在数据流运行程序中比较 apache beam 中的两个数据集并输出三个阶段,一个在数据集 1 中常见,另一个仅在数据集 1 中,然后仅在数据集 2 中。
我尝试使用 CoGroupByKey 但我不确定它是否可以使用,因为我们只有一个一维列表。我们如何比较这些。我尝试的是合并两个 pcollections,如下所示
import apache_beam as beam
with beam.Pipeline() as p:
dataset1 = ['data1a', 'data1b', 'data1c', 'data1d']
dataset2 = ['data2a', 'data2b', 'data1b', 'data2d']
dataset1_pcoll = p | 'Read Dataset 1' >> beam.Create(dataset1)
dataset2_pcoll = p | 'Read Dataset 2' >> beam.Create(dataset2)
combined_data = (
{
'file1': dataset1_pcoll,
'file2': dataset2_pcoll,
}
| 'CoGroup Files' >> beam.CoGroupByKey()
)
出现如下错误
wrapper = lambda x: [fn(*x)]
TypeError: <lambda>() takes 2 positional arguments but 6 were given [while running 'CoGroup Files/CoGroupByKeyImpl/Tag[file2]']
我发现了以下作品。不确定是否还有其他更好的方法。我必须复制该元素以作为键映射,并且能够获得所需的输出。现在我可以标记组合输出并输出不同的 pcollection。
import apache_beam as beam
def make_kv_pair(x):
""" double the record. is this a good practice"""
return (x, x)
with beam.Pipeline() as p:
dataset1 = ['data1a', 'data1b', 'data1c', 'data1d']
dataset2 = ['data2a', 'data2b', 'data1b', 'data2d']
dataset1_pcoll = (p | 'Read Dataset 1' >> beam.Create(dataset1) | 'doubledata1' >> beam.Map(make_kv_pair))
dataset2_pcoll = (p | 'Read Dataset 2' >> beam.Create(dataset2) | 'doubledata2' >> beam.Map(make_kv_pair))
combined_data = (
{
'dataset1': dataset1_pcoll,
'dataset2': dataset2_pcoll,
}
| 'Group datasets' >> beam.CoGroupByKey()
| 'printing combined' >> beam.Map(print)
)