文本文件的spark Row.fromSeq出错

问题描述 投票:1回答:1
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark._
import org.apache.spark.sql.types._
import org.apache.spark.sql._

object fixedLength {

  def main(args:Array[String]) {

    def getRow(x : String) : Row={    
    val columnArray = new Array[String](4)
    columnArray(0)=x.substring(0,3)
    columnArray(1)=x.substring(3,13)
    columnArray(2)=x.substring(13,18)
    columnArray(3)=x.substring(18,22)
    Row.fromSeq(columnArray)  
  }

    Logger.getLogger("org").setLevel(Level.ERROR)

    val spark = SparkSession.builder().master("local").appName("ReadingCSV").getOrCreate()


    val conf = new SparkConf().setAppName("FixedLength").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true");
    val sc = new SparkContext(conf)    
    val fruits = sc.textFile("in/fruits.txt")

    val schemaString = "id,fruitName,isAvailable,unitPrice";
    val fields = schemaString.split(",").map( field => StructField(field,StringType,nullable=true))
    val schema = StructType(fields)

    val df = spark.createDataFrame(fruits.map { x => getRow(x)} , schema)
    df.show() // Error
    println("End of the program")
  }
}

我在df.show()命令中遇到错误。我的文件内容是

56 apple     TRUE 0.56
45 pear      FALSE1.34
34 raspberry TRUE 2.43
34 plum      TRUE 1.31
53 cherry    TRUE 1.4 
23 orange    FALSE2.34
56 persimmon FALSE23.2

错误执行程序:阶段0.0(TID 0)中任务0.0的异常java.lang.ClassCastException:org.apache.spark.util.SerializableConfiguration无法强制转换为[B at org.apache.spark.scheduler.ResultTask.runTask(ResultTask。斯卡拉:81)

你能帮忙吗?

scala apache-spark
1个回答
1
投票

你用古老的方式创建rdd SparkContext(conf)

val conf = new SparkConf().setAppName("FixedLength").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(conf)    
val fruits = sc.textFile("in/fruits.txt")

而你正在使用dataframe以新的方式创建SparkSession

val spark = SparkSession.builder().master("local").appName("ReadingCSV").getOrCreate()
val df = spark.createDataFrame(fruits.map { x => getRow(x)} , schema)

最终你将使用旧的rdd函数创建的sparkContext与使用新的dataframe创建的sparkSession混合。

我建议你只使用一种方法。

我猜这就是问题的原因

更新

做以下事情应该适合你

def getRow(x : String) : Row={    
val columnArray = new Array[String](4)
columnArray(0)=x.substring(0,3)
columnArray(1)=x.substring(3,13)
columnArray(2)=x.substring(13,18)
columnArray(3)=x.substring(18,22)
Row.fromSeq(columnArray)  
}

Logger.getLogger("org").setLevel(Level.ERROR)

val spark = SparkSession.builder().master("local").appName("ReadingCSV").getOrCreate()

val fruits = spark.sparkContext.textFile("in/fruits.txt")

val schemaString = "id,fruitName,isAvailable,unitPrice";
val fields = schemaString.split(",").map( field => StructField(field,StringType,nullable=true))
val schema = StructType(fields)

val df = spark.createDataFrame(fruits.map { x => getRow(x)} , schema)
© www.soinside.com 2019 - 2024. All rights reserved.