代码逻辑无法正常工作。我在Google Cloud上的Apache Beam中的逻辑错误

问题描述 投票:0回答:1

我正在尝试在apache_beam中实现CDC。

[这里,我已经卸载了主数据和新数据,预计每天都会加载。

该联接无法正常工作。有点不对劲。

任何人都可以协助纠正我的错误。我是否缺少任何步骤。

     master_data = (
            p
            | 'Read base from BigQuery ' >> beam.io.Read(
        beam.io.BigQuerySource(query=master_data, use_standard_sql=True))
            |
            'Map id in master' >> beam.Map(
        lambda master: (
            master['id'], master
        )))
    new_data = (
            p
            | 'Read Delta from BigQuery ' >> beam.io.Read(
        beam.io.BigQuerySource(query=new_data, use_standard_sql=True))
            |
            'Map id in new' >> beam.Map(
        lambda new: (
            new['id'], new
        )))

joined_dicts = (
    {'master_data' :master_data, 'new_data' : new_data }
    | beam.CoGroupByKey()
    | beam.FlatMap(join_lists)
    | 'mergeddicts' >> beam.Map(lambda (masterdict, newdict): newdict.update(masterdict))
) 



def join_lists((k,v)):
        itertools.product(v['master_data'], v['new_data'])

观察(在样本数据上)-

主数据:

1, 'A',3232

2, 'B',234

新数据:

1,'A' ,44

4,'D',45

预期在主表中,发布代码实现:

1, 'A',44

2, 'B',234

4,'D',45

但是我在主表中得到什么:

1,'A' ,44

4,'D',45
python google-cloud-platform google-cloud-dataflow apache-beam
1个回答
0
投票

您不必在分组之后进行拼合,因为它再次分隔了元素。

这里是示例代码。

    def join_lists(e):
    (k,v)=e
    return (k, v['new_data']) if v['new_data'] != v['master_data'] else (k, None)

with Pipeline(options=PipelineOptions()) as p:
    master_data = (
        p
        | 'Read base from BigQuery ' >> beam.Create([('A', [3232]),('B', [234])])
    )
    new_data = (
        p
        | 'Read Delta from BigQuery ' >> beam.Create([('A',[44]),('D',[45])])
    )

    joined_dicts = (
        {'master_data' :master_data, 'new_data' : new_data }
        | beam.CoGroupByKey()
        | 'mergeddicts' >> beam.Map(join_lists)
    )



    result = p.run()
    result.wait_until_finish()
© www.soinside.com 2019 - 2024. All rights reserved.