假设我的几列如下:
EMP_ID, EMP_NAME, EMP_CONTACT
1, SIDDHESH, 544949461
现在,我要验证数据是否与列名架构同步。对于EMP_NAME
,该列中的数据应仅为string
。我在参考this链接后尝试了以下代码,但在代码的最后一行显示了错误。
package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
class sample1 {
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val data = spark.read.format("csv").option("header", "true").load("C:/Users/siddheshk2/Desktop/words.txt")
val originalSchema = data.schema
def validateColumns(row: Row): Row = {
val emp_id = row.getAs[String]("EMP_ID")
val emp_name = row.getAs[String]("EMP_NAME")
val emp_contact = row.getAs[String]("EMP_CONTACT")
// do checking here and populate (err_col,err_val,err_desc) with values if applicable
Row.merge(row)
}
val validateDF = data.map { row => validateColumns(row) }
}
因此,它不接受代码val validateDF = data.map { row => validateColumns(row) }
的最后一行。我该如何解决?还是有其他解决我问题的有效方法?
您刚错过了将DataFrame转换为rdd以便应用.map
操作的方法,请改用此方法:
import org.apache.spark.sql.Row
val validateDF = data.rdd.map { row => validateColumns(row) }
如果要将其转换回DataFrame,只需使用sparkSession即可:
val newSchema = // specify the schema of the new dataframe
val updatedDF = spark.createDataFrame(validateDF, newSchema)