我正在尝试使用Cloud Dataflow(Beam Python SDK)对其进行读写。
读取和写入2000万条记录(约80 MB)需要将近30分钟。
查看数据流DAG,我发现大部分时间都在将每个CSV行转换为BQ行。
下面是执行相同操作的代码段:
beam.Map(lambda s: data_ingestion.parse_record_string(s,data_ingestion.stg_schema_dict))
def parse_record_string(self, string_input,schema_dict):
for idx,(x,key) in enumerate(zip(imm_input,schema_dict)):
key = key.strip()
datatype = schema_dict[key].strip()
if key == 'HASH_ID' and datatype != 'STRING':
hash_id = hash(''.join(imm_input[1:idx]))
row_dict[key] = hash_id
else:
if x:
x = x.decode('utf-8').strip()
row_dict[key] = x
else:
row_dict[key] = None
#row_dict[key] = ''
return row_dict
除地图变换外,我还使用了ParDo和Flatmap。它们都产生相同的结果。
[请提出任何可能的调整方法以减少时间。
提前感谢
您的代码在查看时会占用大量计算资源。对于您的每20M条生产线,您执行:
Python很棒,在许多帮助程序中非常方便。但是,请注意这种简便性的陷阱,并正确评估算法的复杂性。
如果您了解Java,请尝试一下。它可能会更有效。