如何检查Spark Scala中的列名称和与其关联的数据是否匹配

问题描述 投票:0回答:1

假设我的几列如下:

EMP_ID, EMP_NAME, EMP_CONTACT  
1, SIDDHESH, 544949461

现在,我要验证数据是否与列名架构同步。对于EMP_NAME,该列中的数据应仅为string。我在参考this链接后尝试了以下代码,但在代码的最后一行显示了错误。

package com.sample
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.Row
class sample1 {
  val spark = SparkSession.builder().master("local[*]").getOrCreate()
  val data = spark.read.format("csv").option("header", "true").load("C:/Users/siddheshk2/Desktop/words.txt")
  val originalSchema = data.schema

  def validateColumns(row: Row): Row = {
    val emp_id = row.getAs[String]("EMP_ID")
    val emp_name = row.getAs[String]("EMP_NAME")
    val emp_contact = row.getAs[String]("EMP_CONTACT")

    // do checking here and populate (err_col,err_val,err_desc) with values if applicable

    Row.merge(row)
  }
  val validateDF = data.map { row => validateColumns(row) }

}  

因此,它不接受代码val validateDF = data.map { row => validateColumns(row) }的最后一行。我该如何解决?还是有其他解决我问题的有效方法?

scala apache-spark bigdata
1个回答
0
投票

您刚错过了将DataFrame转换为rdd以便应用.map操作的方法,请改用此方法:

import org.apache.spark.sql.Row    
val validateDF = data.rdd.map { row => validateColumns(row) }

如果要将其转换回DataFrame,只需使用sparkSession即可:

val newSchema = // specify the schema of the new dataframe
val updatedDF = spark.createDataFrame(validateDF, newSchema)
© www.soinside.com 2019 - 2024. All rights reserved.