使用apache光束读取gzip压缩文件,包裹在TextIOWrapper中,结果“ CompressedFile”对象没有属性“可写”]]

问题描述 投票:0回答:1

我正在努力在Apache光束中实现一个简单的CSV读取器,以及光束回购中的测试:https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python/apache_beam/io/fileio_test.py#L128-L148

def get_csv_reader(readable_file):
  import sys
  import csv
  import io
  if sys.version_info >= (3, 0):
    return csv.reader(io.TextIOWrapper(readable_file.open()))
  else:
    return csv.reader(readable_file.open())

with beam.Pipeline() as p:
  content_pc = (p
                | beam.Create([CSV_FILE])
                | fileio.ReadMatches()
                | beam.FlatMap(get_csv_reader)
                | beam.Map(print))

如果未压缩CSV_FILE并且我没有收到任何错误,这将正常工作。但是,如果我将压缩文件用作输入,则会得到:

<ipython-input-114-4830c3592163> in get_csv_reader(readable_file)
      6   import io
      7   if sys.version_info >= (3, 0):
----> 8     return csv.reader(io.TextIOWrapper(readable_file.open()))
      9   else:
     10     return csv.reader(readable_file.open())

AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']

我理解为什么会这样(TextIOWrapper正在寻找一个可读可写的对象)。是否有对Apache光束/数据流有更深入了解的人,他们可以建议如何最好地实现它来处理压缩和未压缩的输入?

我正在努力在Apache Beam中实现一个简单的CSV读取器,以及来自光束仓库的测试:https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python / ...] >

python google-cloud-dataflow apache-beam
1个回答
0
投票

对不起,麻烦。不幸的是,目前这很尴尬。现在执行此操作的方法是使用ReadableFile对象的内部,有点像这样:

filesystems.Filesystems.open(rf.metadata.path, compression=MY_COMPRESSION)

© www.soinside.com 2019 - 2024. All rights reserved.