输入文件未从pd.read_csv读取

问题描述 投票:0回答:1

我正在尝试使用pandas从apache beam读取存储在google存储中的文件但是收到错误

def Panda_a(self):
    import pandas as pd
    data = 'gs://tegclorox/Input/merge1.csv'
    df1 = pd.read_csv(data, names = ['first_name', 'last_name', 'age', 
         'preTestScore', 'postTestScore'])
    return df1
ip2 = p |'Split WeeklyDueto' >> beam.Map(Panda_a)
ip7 = ip2 | 'print' >> beam.io.WriteToText('gs://tegclorox/Output/merge1234')

当我执行上面的代码时,错误表明该路径不存在。知道为什么吗?

python pandas apache-beam
1个回答
2
投票

这段代码有很多问题。

  • 试图让Pandas从Google云端存储中读取文件。 Pandas不支持谷歌云存储文件系统(正如@Andrew所指出的那样 - documentation说支持的方案是httpftps3file)。但是,您可以使用Beam FileSystems.open() API来获取文件对象,并将该对象提供给Pandas而不是文件路径。
  • p | ... >> beam.Map(...) - beam.Map(f)使用给定的函数PCollection转换输入f的每个元素,它不能应用于管道本身。在您的情况下,您似乎只想在没有任何输入的情况下运行Pandas代码。您可以通过提供虚假输入来模拟,例如beam.Create(['ignored'])
  • beam.Map(f)要求f返回单个值(或者更像是:如果它返回一个列表,它会将该列表解释为单个值),但是您的代码给它一个返回Pandas数据帧的函数。我强烈怀疑你想创建一个包含单个元素的PCollection,其中这个元素是整个数据帧 - 更有可能的是,你希望数据帧的每一行都有1个元素。为此,你需要使用beam.FlatMap,你需要df.iterrows()或类似的东西。

一般来说,我不确定为什么要使用Pandas读取CSV文件。您可以使用Beam的ReadFromTextskip_header_lines=1来阅读它,然后自己解析每一行 - 如果您有大量数据,这将更有效率(如果您只有少量数据并且预计它不会变成大到足以超过单个机器的能力 - 比如,如果它永远不会超过几GB - 那么Beam就是错误的工具)。

© www.soinside.com 2019 - 2024. All rights reserved.