我正在努力在Apache光束中实现一个简单的CSV读取器,以及光束回购中的测试:https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python/apache_beam/io/fileio_test.py#L128-L148
def get_csv_reader(readable_file): import sys import csv import io if sys.version_info >= (3, 0): return csv.reader(io.TextIOWrapper(readable_file.open())) else: return csv.reader(readable_file.open()) with beam.Pipeline() as p: content_pc = (p | beam.Create([CSV_FILE]) | fileio.ReadMatches() | beam.FlatMap(get_csv_reader) | beam.Map(print))
如果未压缩CSV_FILE并且我没有收到任何错误,这将正常工作。但是,如果我将压缩文件用作输入,则会得到:
<ipython-input-114-4830c3592163> in get_csv_reader(readable_file) 6 import io 7 if sys.version_info >= (3, 0): ----> 8 return csv.reader(io.TextIOWrapper(readable_file.open())) 9 else: 10 return csv.reader(readable_file.open()) AttributeError: 'CompressedFile' object has no attribute 'writable' [while running 'FlatMap(get_csv_reader)']
我理解为什么会这样(TextIOWrapper正在寻找一个可读可写的对象)。是否有对Apache光束/数据流有更深入了解的人,他们可以建议如何最好地实现它来处理压缩和未压缩的输入?
我正在努力在Apache Beam中实现一个简单的CSV读取器,以及来自光束仓库的测试:https://github.com/apache/beam/blob/b85795adbd22d8b5cf9ebc684ce43e172a789587/sdks/python / ...] >
对不起,麻烦。不幸的是,目前这很尴尬。现在执行此操作的方法是使用ReadableFile
对象的内部,有点像这样:
filesystems.Filesystems.open(rf.metadata.path, compression=MY_COMPRESSION)