我试图从python中的csv数据创建一个字典,我不想使用传统的分割(',')然后使用重命名行到我想要的标题,因为我将收到不同的csv文件与不同大量的信息,我将无法始终如一地使用该方法定位我想要的行。
标题名称将是一致的,只是他们在一个文件中的标题可能比另一个文件更多
相反,我一直在尝试从CSV文件中制定一个列表,然后将第一行压缩到其余行来创建一个字典,然后我可以提取我想要的确切内容。
我可以使用csv.reader创建列表列表或:
class Split(beam.DoFn):
def process(self, element):
rows = element.splitlines()
data = []
for row in rows:
data.append([row])
return data
返回:
[u'FIRST_NAME,last_name,birthdate,voter_id,phone_number']
[u'hector,ABAD,6/15/1970,11*******,7*********']
[u'm,ABAL,6/16/1949,12********,']
[u'jorge,ABDALA,6/15/1962,21********,3********']
[u'karen,ABELLA,6/18/1988,33********,']
虽然当我尝试通过以下方式访问第一行时:
rows = element.splitlines()
data = []
for row in rows:
# f = pattern.findall(row)
data.append([row])
return data[0]
它返回:
FIRST_NAME,last_name,birthdate,voter_id,phone_number
hector,ABAD,6/15/1970,11*******,7*********
m,ABAL,6/16/1949,109055849,
jorge,ABDALA,6/15/1962,21********,3********
karen,ABELLA,6/18/1988,33********,
我也尝试过beam_utils csv阅读器,虽然这说明我修复了fileio bug后没有名为'sources'的模块。
如果有人知道更好的方式,或者可以指出我做错了什么会很好,这也是我的管道:
with beam.Pipeline(options=pipeline_options) as p:
(p
| 'Read' >> ReadFromText(known_args.input)
| 'Split Values' >> beam.ParDo(Split())
| 'WriteToText' >> beam.io.WriteToText(known_args.output))
我现在只是从我的谷歌云存储桶中读取,但将来它将来自pubsub。
我希望内容看起来像:
{"FIRST_NAME": "hector", "last_name": "ABAD", "birthdate": "6/15/1970", "voter_id": 11*******, "phone_number": 7*********}
etc.
etc.
etc.
python beam SDK似乎没有很好地支持处理csv文件的头元素(除了丢弃它)。幸运的是,有人创建了这个repo来处理这个用例:https://github.com/pabloem/beam_utils
它包含一个CSVFileSource类,它扩展了FileBasedSource(Beam的抽象类,用于创建自定义文件源),用于从带有变量头的文件创建dict。
安装:
pip install beam_utils
from beam_utils.sources import CsvFileSource
它可以像:
p | 'ReadCsvFile' >> beam.io.Read(CsvFileSource(known_args.input))
应该产生你正在寻找的输出。
编辑:要使数据包可用于数据流工作者创建一个tar并使用--extra_package标志提供给作业,如https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#local-or-nonpypi中所示
查看python库模块csv.DictReader:https://docs.python.org/2/library/csv.html#csv.DictReader
复制文档中的示例以供快速参考
>>> import csv
>>> with open('names.csv') as csvfile:
... reader = csv.DictReader(csvfile)
... for row in reader:
... print(row['first_name'], row['last_name'])