使用GCP Dataflow和Apache Beam Python SDK从GCS读取速度极慢

问题描述 投票:0回答:1

首先让我开始说一切,让我开始了解使用Beam的Python SDK和GCP Dataflow的方法!

问题:我的管道对于几乎所有用例都运行良好。没有我可以抱怨的错误。我只是对我可能会遇到的一些瓶颈或优化提出一些问题。我已经注意到,使用大小约为50mb的压缩文件时,我的管道执行时间几乎跳到3个多小时。不能完全确定是否有任何方法可以加快这一速度。以下是在作业最终成功完成之前我看到的一堆日志警告的屏幕截图。

Log Output details from Dataflow

以下是管道的相关摘要:

if __name__ == '__main__':

    parser = argparse.ArgumentParser(
        description='Beam Pipeline: example'
    )
    parser.add_argument('--input-glob',
                        help='Cloud Path To Raw Data',
                        required=True)
    parser.add_argument('--output-bq-table',
                        help='BQ Native {dataset}.{table}')
    known_args, pipeline_args = parser.parse_known_args()

    with beam.Pipeline(argv=pipeline_args) as p:

        json_raw = (p | 'READING RAW DATA' >> beam.io.ReadFromText(known_args.input_glob)
            | 'JSON-ing' >> beam.Map(lambda e: json.loads(e))
        )

额外信息:

  1. 我正在使用Airflow's Dataflowhook启动它。
  2. 我玩过不同类型的机器,希望为该问题提供更多的计算能力会使它消失,但到目前为止没有运气。

  3. 以下管道执行参数:

--runner=DataflowRunner \
--project=example-project \
--region=us-east4 \
--save_main_session \
--temp_location=gs://example-bucket/temp/ \
--input-glob=gs://example-bucket/raw/  \
--machine_type=n1-standard-16 \
--job_name=this-is-a-test-insert
python-3.x google-cloud-dataflow apache-beam
1个回答
0
投票

Beam具有许多优化功能,可以将其文件处理分为多个工作程序。此外,它能够拆分单个文件,以供多个工作人员并行使用。

很遗憾,gzip文件无法做到这一点。这是因为gzip文件被压缩到一个必须连续解压缩的块中。当Beam Worker读取此文件时,它必须顺序读取整个内容。

有些压缩格式允许您并行读取它们(这些通常是“多块”格式)。不幸的是,我相信Beam Python SDK仅supports serial formats 目前

如果需要使管道以这种方式工作,则可以尝试在beam.Reshuffle之后添加ReadFromText操作。这样,您的管道仍将顺序读取文件,但并行应用所有下游操作(请参见Performance section in the PTransform guide以了解为什么)。

一些其他选择:

  • 将您的数据分成多个gzip文件。
  • 在将其用于管道之前对其进行解压缩。
  • ((半开玩笑/半开玩笑)为Beam提供多块压缩支持吗? :)):))
© www.soinside.com 2019 - 2024. All rights reserved.