我有几个 CSV 文件,它们的数据行数不一致,没有标题行,我想将这些文件读入单个 PySpark DataFrame。 CSV 文件的结构如下:
data1,data2
data1,data2,data3
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 => data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2,data3,data4,data5,data6 data1,data2,data3,data4,data5,data6
data1,data2
data1,data2
data1,data2,data3
我想跳过每个文件的前两行和最后三行,然后将它们合并到一个 DataFrame 中。我正在使用以下方法,该方法有效,但我正在寻找更优化的解决方案,因为数据集非常大。
def concat(df_list: list):
df = df_list[0]
for i in df_list[1:]:
df = df.unionByName(i, allowMissingColumns=True)
return df
def __read_with_separators(self, spark: SparkSession, field_details: List[Dict[str, Any]], file_path_list: List[str], kwargs: dict) -> DataFrame:
df_list = []
for file_path in file_path_list:
rdd = spark.sparkContext.textFile(file_path)
total_rows = rdd.count()
start_index = kwargs.get("skiprows", 0)
end_index = total_rows - kwargs.get("skipfooter", 0)
rdd_filtered = rdd.zipWithIndex().filter(lambda x: start_index <= x[1] < end_index).map(lambda x: x[0]).map(lambda line: line.split(delimiter))
temp_df = rdd_filtered.toDF(schema)
df_list.append(temp_df)
return concat(df_list)
问题:
尝试在内置函数中使用 Spark
read.text
根据分隔符进行过滤和拆分以创建数据框。
Example:
#input
spark.read.text("<csv_path>").show()
#+-----------------------------------+
#|value |
#+-----------------------------------+
#|data1,data2 |
#|data1,data2,data3 |
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2,data3,data4,data5,data6|
#|data1,data2 |
#|data1,data2 |
#|data1,data2,data3 |
#+-----------------------------------+
#read as text and filter out the length is 6
#then traverse through array to get data from the split_value column
spark.read.text("<csv_file_path>").\
filter(size(split(col("value"),",")) == "6").\
withColumn("split_value", split(col("value"), ",")) \
.select(
col("split_value").getItem(0).alias("data1"),
col("split_value").getItem(1).alias("data2"),
col("split_value").getItem(2).alias("data3"),
col("split_value").getItem(3).alias("data4"),
col("split_value").getItem(4).alias("data5"),
col("split_value").getItem(5).alias("data6")
).\
show()
#result:
#+-----+-----+-----+-----+-----+-----+
#|data1|data2|data3|data4|data5|data6|
#+-----+-----+-----+-----+-----+-----+
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#|data1|data2|data3|data4|data5|data6|
#+-----+-----+-----+-----+-----+-----+