Beam / Dataflow ReadAllFromParquet不读取任何内容,但我的工作仍然成功吗?

问题描述 投票:0回答:1

我有一个数据流作业,它:

  1. 从GCS读取文本文件,其中包含其他文件名
  2. 将文件名传递给ReadAllFromParquet以读取.parquet文件
  3. 写入BigQuery

尽管我的工作“成功”,但在ReadAllFromParquet步骤之后,基本上没有输出集合。我成功读取了列表中的文件,例如:['gs://my_bucket/my_file1.snappy.parquet','gs://my_bucket/my_file2.snappy.parquet','gs://my_bucket/my_file3.snappy.parquet']我还确认此列表正确,并且在ReadAllFromParquet之前的步骤中使用记录器,文件的GCS路径正确。

这就是我的管道的样子(为简洁起见,省略了完整的代码,但是我相信它通常可以正常工作,因为我使用ReadAllFromText具有与.csv完全相同的管道,并且可以正常工作:]]]

 with beam.Pipeline(options=pipeline_options_batch) as pipeline_2:

    try:

        final_data = (
        pipeline_2
        |'Create empty PCollection' >> beam.Create([None])
        |'Get accepted batch file: {}'.format(runtime_options.complete_batch) >> beam.ParDo(OutputValueProviderFn(runtime_options.complete_batch))
        |'Read all filenames into a list'>> beam.ParDo(FileIterator(runtime_options.files_bucket))
        |'Read all files' >> beam.io.ReadAllFromParquet(columns=['locationItemId','deviceId','timestamp'])
        |'Process all files' >> beam.ParDo(ProcessSch2())
        |'Transform to rows' >> beam.ParDo(BlisDictSch2())
        |'Write to BigQuery' >> beam.io.WriteToBigQuery(
                table = runtime_options.comp_table, 
                schema = SCHEMA_2,
                project = pipeline_options_batch.view_as(GoogleCloudOptions).project, #options.display_data()['project'],
                create_disposition = beam.io.BigQueryDisposition.CREATE_IF_NEEDED,  #'CREATE_IF_NEEDED',#create if does not exist.
                write_disposition = beam.io.BigQueryDisposition.WRITE_APPEND    #'WRITE_APPEND' #add to existing rows,partitoning
                )
        )
    except Exception as exception:
        logging.error(exception)
        pass

这就是我的工作图如下所示:

enter image description here

有人知道这里可能出什么问题了,什么是调试的最佳方法?目前我的想法:

  1. 存储桶权限问题。我注意到我正在读取的存储桶很奇怪,因为尽管我是项目所有者,但我仍然无法下载文件。项目的所有者只有“存储旧版存储桶所有者”。我添加了“ Storage Admin”,然后使用我自己的帐户手动下载文件时,它可以正常工作。根据数据流文档,我已确保默认计算服务帐户以及数据流帐户在此存储桶上均具有“存储管理员”。但是,也许这全是红鲱鱼,因为最终如果出现权限问题,我应该在日志中看到它,并且作业将失败?

  2. ReadAllFromParquet是否要求文件格式为其他格式?我已经显示了列表的示例(在上面的图表中,我可以看到输入集合正确显示了列表中48个文件的添加元素= 48)。我知道这种格式适用于ReadAllFromText,所以我认为它们是等效的,应该可以使用。

  3. ==========

编辑:注意到其他可能导致的后果。与我的其他使用ReadAllFromText且工作正常的工作相比,我注意到命名中的轻微不匹配令人担忧。

这是我的工作输出集合的名称:enter image description here

这是我的实木复合地板工作上的名字,实际上什么都没读:enter image description here

特别注意

Read all files/ReadAllFiles/ReadRange.out0

vs

Read all files/Read all files/ReadRange.out0

路径的第一部分是这两个作业的步骤名称。但我认为第二个是apache_beam.io.filebasedsource(https://github.com/apache/beam/blob/master/sdks/python/apache_beam/io/filebasedsource.py)中的ReadAllFiles类,该类都同时调用ReadAllFromText和ReadAllFromParquet。

似乎是潜在的错误,但似乎无法在源代码中对其进行跟踪。

=============编辑2

经过更多挖掘后,似乎ReadAllFromParquet仍无法正常工作。 ReadFromParquet调用apache_beam.io.parquetio._ParquetSource,而ReadAllFromParquet只是调用apache_beam.io.filebasedsource._ReadRange。

我想知道是否有一种方法可以将其启用为实验功能?

我有一个数据流作业,该作业:从GCS读取文本文件中的其他文件名,将文件名传递给ReadAllFromParquet以读取.parquet文件,尽管我的工作'成功',但仍写入BigQuery ...

google-cloud-platform google-cloud-dataflow apache-beam google-cloud-iam
1个回答
0
投票
© www.soinside.com 2019 - 2024. All rights reserved.