在pySpark中将csv加载到DataFrame时出现问题

问题描述 投票:0回答:1

我正在尝试将一堆CSV文件聚合为一个,并使用AWS Glue中的ETL作业以ORC格式将其输出到S3。我的聚合CSV如下所示:

header1,header2,header3
foo1,foo2,foo3
bar1,bar2,bar3

我有一个聚合CSV的字符串表示,名为aggregated_csv,内容为header1,header2,header3\nfoo1,foo2,foo3\nbar1,bar2,bar3。我已经读过pyspark有一种简单的方法可以将CSV文件转换为DataFrames(我需要这样才能利用Glue在ORC中轻松输出的能力)。这是我尝试过的一小段内容:

def f(glueContext, aggregated_csv, schema):
    with open('somefile', 'a+') as agg_file:
        agg_file.write(aggregated_csv)
        #agg_file.seek(0)
        df = glueContext.read.csv(agg_file, schema=schema, header="true")
        df.show()

无论有没有寻求,我都试过了。当我不调用seek()时,作业成功完成,但df.show()不显示除标题之外的任何数据。当我调用seek()时,我得到以下异常:

pyspark.sql.utils.AnalysisException: u'Path does not exist: hdfs://ip-172-31-48-255.us-west-2.compute.internal:8020/user/root/header1,header2,header3\n;'

由于seek似乎改变了行为,并且因为我的csv中的头文件是异常字符串的一部分,所以我假设问题在某种程度上与我将文件传递给glueContext.read.csv()时文件光标的位置有关但我不确定如何解决它。如果我取消注释seek(0)调用并添加agg_file.read()命令,我可以按预期看到文件的全部内容。我需要更改什么才能成功读取我刚写入spark数据帧的csv文件?

python csv dataframe pyspark aws-glue
1个回答
2
投票

我认为你向csv函数传递了错误的参数。我相信GlueContext.read.csv()将获得DataFrameReader.csv()的一个实例,并且它的签名将文件名作为第一个参数,而你传递的是一个类似文件的对象。

def f(glueContext, aggregated_csv, schema):
    with open('somefile', 'a+') as agg_file:
        agg_file.write(aggregated_csv)
        #agg_file.seek(0)
    df = glueContext.read.csv('somefile', schema=schema, header="true")
    df.show()

但是,如果你想要它写一个ORC文件,并且你已经将数据读作aggregated_csv,你可以直接从元组列表中创建一个DataFrame

df = spark.createDataFrame([('foo1','foo2','foo3'), ('bar1','bar2','bar3')], ['header1', 'header2', 'header3'])

然后,如果你需要一个胶水DynamicFrame使用fromDF功能

dynF = fromDF(df, glueContext, 'myFrame')

更多但是:你不需要胶水来写ORC - 它完全能够激发它。只需使用DataFrameWriter.orc()功能:

df.write.orc('s3://path')
© www.soinside.com 2019 - 2024. All rights reserved.