我有一个数据流作业,它:
尽管我的工作“成功”,但在ReadAllFromParquet步骤之后,基本上没有输出集合。我成功读取了列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet']
我还确认此列表正确,并且在ReadAllFromParquet之前的步骤中使用记录器,文件的GCS路径正确。
这就是我的管道的样子(为简洁起见,省略了完整的代码,但是我相信它通常可以正常工作,因为我使用ReadAllFromText具有与.csv完全相同的管道,并且可以正常工作:]]]
with beam.Pipeline(options=pipeline_options_batch) as pipeline_2: try: final_data = ( pipeline_2 |'Create empty PCollection' >> beam.Create([None]) |'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch)) |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket)) |'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp']) |'Process all files' >> beam.ParDo(ProcessSch2()) |'Transform to rows' >> beam.ParDo(BlisDictSch2()) |'Write to BigQuery' >> beam.io.WriteToBigQuery( table = runtime_options.comp_table, schema = SCHEMA_2, project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'], create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED, #'CREATE_IF_NEEDED',#create if does not exist. write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND #'WRITE_APPEND' #add to existing rows,partitoning ) ) except Exception as exception: logging.error(exception) pass
这就是我的工作图如下所示:
有人知道这里可能出什么问题了,什么是调试的最佳方法?目前我的想法:
存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我仍然无法下载文件。项目的所有者只有“存储旧版存储桶所有者”。我添加了“ Storage Admin”,然后使用我自己的帐户手动下载文件时,它可以正常工作。根据数据流文档,我已确保默认计算服务帐户以及数据流帐户在此存储桶上均具有“存储管理员”。但是,也许这全是红鲱鱼,因为最终如果出现权限问题,我应该在日志中看到它,并且作业将失败?
ReadAllFromParquet是否要求文件格式为其他格式?我已经显示了列表的示例(在上面的图表中,我可以看到输入集合正确显示了列表中48个文件的添加元素= 48)。我知道这种格式适用于ReadAllFromText,所以我认为它们是等效的,应该可以使用。
==========
编辑:注意到其他可能导致的后果。与我的其他使用ReadAllFromText且工作正常的工作相比,我注意到命名中的轻微不匹配令人担忧。
特别注意
Read all files/ReadAllFiles/ReadRange.out0
vs
Read all files/Read all files/ReadRange.out0
路径的第一部分是这两个作业的步骤名称。但我认为第二个是apache_beam.io.filebasedsource(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py)中的ReadAllFiles类,该类都同时调用ReadAllFromText和ReadAllFromParquet。
似乎是潜在的错误,但似乎无法在源代码中对其进行跟踪。
=============编辑2
经过更多挖掘后,似乎ReadAllFromParquet仍无法正常工作。 ReadFromParquet调用apache_beam.io.parquetio._ParquetSource,而ReadAllFromParquet只是调用apache_beam.io.filebasedsource._ReadRange。
我想知道是否有一种方法可以将其启用为实验功能?
我有一个数据流作业,该作业:从GCS读取文本文件中的其他文件名,将文件名传递给ReadAllFromParquet以读取.parquet文件,尽管我的工作'成功',但仍写入BigQuery ...