我正在将 CSV 文件加载到数据框中,我可以做到这一点,但我需要跳过文件中的前三行。
我尝试了
.option()
命令,将 header 设置为 true,但它忽略了唯一的第一行。
val df = spark.sqlContext.read
.schema(Myschema)
.option("header",true)
.option("delimiter", "|")
.csv(path)
我想将标题设置为 3 行,但我找不到方法。
替代想法:从数据框中跳过这 3 行
请帮我解决这个问题。预先感谢。
处理问题的通用方法是对数据帧建立索引并过滤大于 2 的索引。
直接的方法:
正如另一个答案中所建议的,您可以尝试使用
monotonically_increasing_id
添加索引。
df.withColumn("Index",monotonically_increasing_id)
.filter('Index > 2)
.drop("Index")
但是,只有前 3 行位于第一个分区中时,这才有效。此外,正如评论中提到的,今天的情况就是这样,但是这段代码可能会随着进一步的版本或火花而完全破坏,这将很难调试。事实上,API 中的约定只是“生成的 ID 保证单调递增且唯一,但不连续”。因此,假设它们总是从零开始是不太明智的。在当前版本中甚至可能有其他情况不起作用(但我不确定)。
为了说明我的第一个担忧,请看一下:
scala> spark.range(4).withColumn("Index",monotonically_increasing_id()).show()
+---+----------+
| id| Index|
+---+----------+
| 0| 0|
| 1| 1|
| 2|8589934592|
| 3|8589934593|
+---+----------+
我们只会删除两行...
安全方法:
之前的方法在大多数情况下都有效,但为了安全起见,您可以使用 RDD API 中的
zipWithIndex
来获取连续索引。
def zipWithIndex(df : DataFrame, name : String) : DataFrame = {
val rdd = df.rdd.zipWithIndex
.map{ case (row, i) => Row.fromSeq(row.toSeq :+ i) }
val newSchema = df.schema
.add(StructField(name, LongType, false))
df.sparkSession.createDataFrame(rdd, newSchema)
}
zipWithIndex(df, "index").where('index > 2).drop("index")
我们可以检查它是否更安全:
scala> zipWithIndex(spark.range(4).toDF("id"), "index").show()
+---+-----+
| id|index|
+---+-----+
| 0| 0|
| 1| 1|
| 2| 2|
| 3| 3|
+---+-----+
df1 = spark.read.format("csv")\ .option("header","false")\ .option("skiprows","1")\ .schema(my_schema)\ .load("/FileStore/tables/2010_summary.csv") df1.display()
您可以尝试这个选项
df.withColumn("Index",monotonically_increasing_id())
.filter(col("Index") > 2)
.drop("Index")
您可以尝试将 wrt 更改为您的架构。
import org.apache.spark.sql.Row
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
//Read CSV
val file = sc.textFile("csvfilelocation")
//Remove first 3 lines
val data = file.mapPartitionsWithIndex{ (idx, iter) => if (idx == 0) iter.drop(3) else iter }
//Create RowRDD by mapping each line to the required fields
val rowRdd = data.map(x=>Row(x(0), x(1)))
//create dataframe by calling sqlcontext.createDataframe with rowRdd and your schema
val df = sqlContext.createDataFrame(rowRdd, schema)