使用Google Dataflow优化使用BigQuery资源从GCS加载200万个JSON文件

问题描述 投票:0回答:1

我有一个庞大的数据库,其中包含约240万个JSON文件,它们本身包含多条记录。我创建了一个简单的apache-beam数据管道(如下所示),该管道遵循以下步骤:

  • 使用全局模式从GCS存储桶读取数据。
  • 从JSON数据中提取记录。
  • 转换数据:将字典转换为JSON字符串,解析时间戳等。
  • 写入BigQuery。
# Pipeline
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)

# Read
files = p | 'get_data' >> ReadFromText(files_pattern)

# Transform
output = (files
          | 'extract_records' >> beam.ParDo(ExtractRecordsFn())
          | 'transform_data' >> beam.ParDo(TransformDataFn()))

# Write
output | 'write_data' >> WriteToBigQuery(table=known_args.table,
                                         create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
                                         write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
                                         insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
                                         temp_file_format='NEWLINE_DELIMITED_JSON')

# Run
result = p.run()
result.wait_until_finish()

我已经用最少的样本数据集测试了该管道,并且按预期工作。但是我对BigQuery资源和配额的最佳使用感到怀疑。 batch load quotas are very restrictive,并且由于要分析和加载大量文件,我想知道是否缺少一些可以保证管道将遵守配额并以最佳方式运行的设置。我不想超出配额,因为我正在同一项目中向BigQuery运行其他负载。

我还没有完全理解WriteToBigQuery()转换的某些参数,特别是batch_sizemax_file_sizemax_files_per_bundle,或者它们是否可以帮助优化BigQuery的加载作业。您能帮我吗?

python-3.x google-bigquery google-cloud-dataflow apache-beam
1个回答
0
投票

如果您担心加载作业的配额,可以尝试将数据流传输到restrictive quota policy较少的bigquery中。

要实现您想做的事情,您可以尝试Google提供的模板或仅参考其代码。

最后但并非最不重要的是,可以在Google BigQuery I/O connector上找到更详细的信息。

© www.soinside.com 2019 - 2024. All rights reserved.