写入BigQuery时出现Cloud Dataflow性能问题

问题描述 投票:0回答:1

我正在尝试使用Cloud Dataflow(Beam Python SDK)对其进行读写。

读取和写入2000万条记录(约80 MB)需要将近30分钟。

查看数据流DAG,我发现大部分时间都在将每个CSV行转换为BQ行。

enter image description here

下面是执行相同操作的代码段:

beam.Map(lambda s: data_ingestion.parse_record_string(s,data_ingestion.stg_schema_dict)) 

    def parse_record_string(self, string_input,schema_dict): 

        for idx,(x,key) in enumerate(zip(imm_input,schema_dict)):
            key = key.strip()
            datatype = schema_dict[key].strip()
            if key == 'HASH_ID' and datatype != 'STRING':
                hash_id = hash(''.join(imm_input[1:idx]))
                row_dict[key] = hash_id
            else:
                if x:
                    x = x.decode('utf-8').strip()
                    row_dict[key] = x
                else:
                    row_dict[key] = None
                    #row_dict[key] = ''
        return row_dict

除地图变换外,我还使用了ParDo和Flatmap。它们都产生相同的结果。

[请提出任何可能的调整方法以减少时间。

提前感谢

python-2.7 google-cloud-platform profiling google-cloud-dataflow apache-beam
1个回答
2
投票

您的代码在查看时会占用大量计算资源。对于您的每20M条生产线,您执行:

  • 一个for循环(每行多少个元素?)
  • 一个拉链并枚举
  • 在循环上的每个元素上
    • 您执行2条(在字符串上循环以删除空格)
    • 切片上的联接(两个循环)->此条件多长时间为真?
    • 其他情况下的另一条带

Python很棒,在许多帮助程序中非常方便。但是,请注意这种简便性的陷阱,并正确评估算法的复杂性。

如果您了解Java,请尝试一下。它可能会更有效。

© www.soinside.com 2019 - 2024. All rights reserved.