首先让我开始说一切,让我开始了解使用Beam的Python SDK和GCP Dataflow的方法!
问题:我的管道对于几乎所有用例都运行良好。没有我可以抱怨的错误。我只是对我可能会遇到的一些瓶颈或优化提出一些问题。我已经注意到,使用大小约为50mb的压缩文件时,我的管道执行时间几乎跳到3个多小时。不能完全确定是否有任何方法可以加快这一速度。以下是在作业最终成功完成之前我看到的一堆日志警告的屏幕截图。
Log Output details from Dataflow
以下是管道的相关摘要:
if __name__ == '__main__':
parser = argparse.ArgumentParser(
description='Beam Pipeline: example'
)
parser.add_argument('--input-glob',
help='Cloud Path To Raw Data',
required=True)
parser.add_argument('--output-bq-table',
help='BQ Native {dataset}.{table}')
known_args, pipeline_args = parser.parse_known_args()
with beam.Pipeline(argv=pipeline_args) as p:
json_raw = (p | 'READING RAW DATA' >> beam.io.ReadFromText(known_args.input_glob)
| 'JSON-ing' >> beam.Map(lambda e: json.loads(e))
)
额外信息:
我玩过不同类型的机器,希望为该问题提供更多的计算能力会使它消失,但到目前为止没有运气。
以下管道执行参数:
--runner=DataflowRunner \
--project=example-project \
--region=us-east4 \
--save_main_session \
--temp_location=gs://example-bucket/temp/ \
--input-glob=gs://example-bucket/raw/ \
--machine_type=n1-standard-16 \
--job_name=this-is-a-test-insert
Beam具有许多优化功能,可以将其文件处理分为多个工作程序。此外,它能够拆分单个文件,以供多个工作人员并行使用。
很遗憾,gzip文件无法做到这一点。这是因为gzip文件被压缩到一个必须连续解压缩的块中。当Beam Worker读取此文件时,它必须顺序读取整个内容。
有些压缩格式允许您并行读取它们(这些通常是“多块”格式)。不幸的是,我相信Beam Python SDK仅supports serial formats 目前。
如果需要使管道以这种方式工作,则可以尝试在beam.Reshuffle
之后添加ReadFromText
操作。这样,您的管道仍将顺序读取文件,但并行应用所有下游操作(请参见Performance section in the PTransform guide以了解为什么)。
一些其他选择: