从CSV dataflow python创建一个Dict

问题描述 投票:2回答:2

我试图从python中的csv数据创建一个字典,我不想使用传统的分割(',')然后使用重命名行到我想要的标题,因为我将收到不同的csv文件与不同大量的信息,我将无法始终如一地使用该方法定位我想要的行。

标题名称将是一致的,只是他们在一个文件中的标题可能比另一个文件更多

相反,我一直在尝试从CSV文件中制定一个列表,然后将第一行压缩到其余行来创建一个字典,然后我可以提取我想要的确切内容。

我可以使用csv.reader创建列表列表或:

class Split(beam.DoFn):
    def process(self, element):
        rows = element.splitlines()
        data = []
        for row in rows:
            data.append([row])
        return data

返回:

[u'FIRST_NAME,last_name,birthdate,voter_id,phone_number']
[u'hector,ABAD,6/15/1970,11*******,7*********']
[u'm,ABAL,6/16/1949,12********,']
[u'jorge,ABDALA,6/15/1962,21********,3********']
[u'karen,ABELLA,6/18/1988,33********,']

虽然当我尝试通过以下方式访问第一行时:

rows = element.splitlines()
data = []
for row in rows:
    # f = pattern.findall(row)
    data.append([row])
return data[0]

它返回:

FIRST_NAME,last_name,birthdate,voter_id,phone_number
hector,ABAD,6/15/1970,11*******,7*********
m,ABAL,6/16/1949,109055849,
jorge,ABDALA,6/15/1962,21********,3********
karen,ABELLA,6/18/1988,33********,

我也尝试过beam_utils csv阅读器,虽然这说明我修复了fileio bug后没有名为'sources'的模块。

如果有人知道更好的方式,或者可以指出我做错了什么会很好,这也是我的管道:

with beam.Pipeline(options=pipeline_options) as p:
    (p
     | 'Read' >> ReadFromText(known_args.input)
     | 'Split Values' >> beam.ParDo(Split())
     | 'WriteToText' >> beam.io.WriteToText(known_args.output)) 

我现在只是从我的谷歌云存储桶中读取,但将来它将来自pubsub。

我希望内容看起来像:

{"FIRST_NAME": "hector", "last_name": "ABAD", "birthdate": "6/15/1970", "voter_id": 11*******, "phone_number": 7*********}
etc.
etc.
etc.
python list csv dictionary google-cloud-dataflow
2个回答
2
投票

python beam SDK似乎没有很好地支持处理csv文件的头元素(除了丢弃它)。幸运的是,有人创建了这个repo来处理这个用例:https://github.com/pabloem/beam_utils

它包含一个CSVFileSource类,它扩展了FileBasedSource(Beam的抽象类,用于创建自定义文件源),用于从带有变量头的文件创建dict。

安装:

pip install beam_utils
from beam_utils.sources import CsvFileSource

它可以像:

 p | 'ReadCsvFile' >> beam.io.Read(CsvFileSource(known_args.input))

应该产生你正在寻找的输出。

编辑:要使数据包可用于数据流工作者创建一个tar并使用--extra_package标志提供给作业,如https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/#local-or-nonpypi中所示


-1
投票

查看python库模块csv.DictReader:https://docs.python.org/2/library/csv.html#csv.DictReader

复制文档中的示例以供快速参考

>>> import csv
>>> with open('names.csv') as csvfile:
...     reader = csv.DictReader(csvfile)
...     for row in reader:
...         print(row['first_name'], row['last_name'])
© www.soinside.com 2019 - 2024. All rights reserved.