我正在尝试在apache_beam中实现CDC。
[这里,我已经卸载了主数据和新数据,预计每天都会加载。
该联接无法正常工作。有点不对劲。
任何人都可以协助纠正我的错误。我是否缺少任何步骤。
master_data = (
p
| 'Read base from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query=master_data, use_standard_sql=True))
|
'Map id in master' >> beam.Map(
lambda master: (
master['id'], master
)))
new_data = (
p
| 'Read Delta from BigQuery ' >> beam.io.Read(
beam.io.BigQuerySource(query=new_data, use_standard_sql=True))
|
'Map id in new' >> beam.Map(
lambda new: (
new['id'], new
)))
joined_dicts = (
{'master_data' :master_data, 'new_data' : new_data }
| beam.CoGroupByKey()
| beam.FlatMap(join_lists)
| 'mergeddicts' >> beam.Map(lambda (masterdict, newdict): newdict.update(masterdict))
)
def join_lists((k,v)):
itertools.product(v['master_data'], v['new_data'])
观察(在样本数据上)-
主数据:
1, 'A',3232
2, 'B',234
新数据:
1,'A' ,44
4,'D',45
预期在主表中,发布代码实现:
1, 'A',44
2, 'B',234
4,'D',45
但是我在主表中得到什么:
1,'A' ,44
4,'D',45
您不必在分组之后进行拼合,因为它再次分隔了元素。
这里是示例代码。
def join_lists(e):
(k,v)=e
return (k, v['new_data']) if v['new_data'] != v['master_data'] else (k, None)
with Pipeline(options=PipelineOptions()) as p:
master_data = (
p
| 'Read base from BigQuery ' >> beam.Create([('A', [3232]),('B', [234])])
)
new_data = (
p
| 'Read Delta from BigQuery ' >> beam.Create([('A',[44]),('D',[45])])
)
joined_dicts = (
{'master_data' :master_data, 'new_data' : new_data }
| beam.CoGroupByKey()
| 'mergeddicts' >> beam.Map(join_lists)
)
result = p.run()
result.wait_until_finish()