我有一个庞大的数据库,其中包含约240万个JSON文件,它们本身包含多条记录。我创建了一个简单的apache-beam数据管道(如下所示),该管道遵循以下步骤:
# Pipeline
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = save_main_session
p = beam.Pipeline(options=pipeline_options)
# Read
files = p | 'get_data' >> ReadFromText(files_pattern)
# Transform
output = (files
| 'extract_records' >> beam.ParDo(ExtractRecordsFn())
| 'transform_data' >> beam.ParDo(TransformDataFn()))
# Write
output | 'write_data' >> WriteToBigQuery(table=known_args.table,
create_disposition=beam.io.BigQueryDisposition.CREATE_NEVER,
write_disposition=beam.io.BigQueryDisposition.WRITE_EMPTY,
insert_retry_strategy='RETRY_ON_TRANSIENT_ERROR',
temp_file_format='NEWLINE_DELIMITED_JSON')
# Run
result = p.run()
result.wait_until_finish()
我已经用最少的样本数据集测试了该管道,并且按预期工作。但是我对BigQuery资源和配额的最佳使用感到怀疑。 batch load quotas are very restrictive,并且由于要分析和加载大量文件,我想知道是否缺少一些可以保证管道将遵守配额并以最佳方式运行的设置。我不想超出配额,因为我正在同一项目中向BigQuery运行其他负载。
我还没有完全理解WriteToBigQuery()
转换的某些参数,特别是batch_size
,max_file_size
和max_files_per_bundle
,或者它们是否可以帮助优化BigQuery的加载作业。您能帮我吗?
如果您担心加载作业的配额,可以尝试将数据流传输到restrictive quota policy较少的bigquery中。
要实现您想做的事情,您可以尝试Google提供的模板或仅参考其代码。
最后但并非最不重要的是,可以在Google BigQuery I/O connector上找到更详细的信息。