如何在 PySpark 中高效读取多个跳过行和页脚的 CSV 文件?

问题描述 投票:0回答:1

我有几个 CSV 文件,它们的数据行数不一致,没有标题行,我想将这些文件读入单个 PySpark DataFrame。 CSV 文件的结构如下:

data1,data2
data1,data2,data3
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6       =>        data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6                 data1,data2,data3,data4,data5,data6
data1,data2
data1,data2
data1,data2,data3

我想跳过每个文件的前两行和最后三行,然后将它们合并到一个 DataFrame 中。我正在使用以下方法,该方法有效,但我正在寻找更优化的解决方案,因为数据集非常大。

def concat(df_list: list):
    df = df_list[0]
    for i in df_list[1:]:
        df = df.unionByName(i, allowMissingColumns=True)
    return df


def __read_with_separators(self, spark: SparkSession, field_details: List[Dict[str, Any]], file_path_list: List[str], kwargs: dict) -> DataFrame:
    df_list = []
    for file_path in file_path_list:
        rdd = spark.sparkContext.textFile(file_path)

        total_rows = rdd.count()
        start_index = kwargs.get("skiprows", 0)
        end_index = total_rows - kwargs.get("skipfooter", 0)

        rdd_filtered = rdd.zipWithIndex().filter(lambda x: start_index <= x[1] < end_index).map(lambda x: x[0]).map(lambda line: line.split(delimiter))
        temp_df = rdd_filtered.toDF(schema)
        df_list.append(temp_df)

    return concat(df_list)

问题:

  1. 有没有办法一次读取多个 CSV 文件,同时更有效地跳过行和页脚行?
  2. 当前的方法是否有任何优化或改进来更有效地处理大型数据集?
python python-3.x apache-spark pyspark apache-spark-sql
1个回答
0
投票

尝试在内置函数中使用 Spark

read.text
根据分隔符进行过滤和拆分以创建数据框。

Example:

#input
spark.read.text("<csv_path>").show()
#+-----------------------------------+
#|value                              |
#+-----------------------------------+
#|data1,data2                        |
#|data1,data2,data3                  |
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2                        |
#|data1,data2                        |
#|data1,data2,data3                  |
#+-----------------------------------+

#read as text and filter out the length is 6
#then traverse through array to get data from the split_value column
spark.read.text("<csv_file_path>").\
  filter(size(split(col("value"),",")) == "6").\
    withColumn("split_value", split(col("value"), ",")) \
    .select(
        col("split_value").getItem(0).alias("data1"),
        col("split_value").getItem(1).alias("data2"),
        col("split_value").getItem(2).alias("data3"),
        col("split_value").getItem(3).alias("data4"),
        col("split_value").getItem(4).alias("data5"),
        col("split_value").getItem(5).alias("data6")
    ).\
    show()

#result:
#+-----+-----+-----+-----+-----+-----+
#|data1|data2|data3|data4|data5|data6|
#+-----+-----+-----+-----+-----+-----+
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#+-----+-----+-----+-----+-----+-----+
© www.soinside.com 2019 - 2024. All rights reserved.