import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._
import org.apache.spark._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
object fixedLength {
def main(args:Array[String]) {
def getRow(x : String) : Row={
val columnArray = new Array[String](4)
columnArray(0)=x.substring(0,3)
columnArray(1)=x.substring(3,13)
columnArray(2)=x.substring(13,18)
columnArray(3)=x.substring(18,22)
Row.fromSeq(columnArray)
}
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().master("local").appName("ReadingCSV").getOrCreate()
val conf = new SparkConf().setAppName("FixedLength").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(conf)
val fruits = sc.textFile("in/fruits.txt")
val schemaString = "id,fruitName,isAvailable,unitPrice";
val fields = schemaString.split(",").map( field => StructField(field,StringType,nullable=true))
val schema = StructType(fields)
val df = spark.createDataFrame(fruits.map { x => getRow(x)} , schema)
df.show() // Error
println("End of the program")
}
}
我在df.show()命令中遇到错误。我的文件内容是
56 apple TRUE 0.56 45 pear FALSE1.34 34 raspberry TRUE 2.43 34 plum TRUE 1.31 53 cherry TRUE 1.4 23 orange FALSE2.34 56 persimmon FALSE23.2
错误执行程序:阶段0.0(TID 0)中任务0.0的异常java.lang.ClassCastException:org.apache.spark.util.SerializableConfiguration无法强制转换为[B at org.apache.spark.scheduler.ResultTask.runTask(ResultTask。斯卡拉:81)
你能帮忙吗?
你用古老的方式创建rdd
SparkContext(conf)
val conf = new SparkConf().setAppName("FixedLength").setMaster("local[*]").set("spark.driver.allowMultipleContexts", "true");
val sc = new SparkContext(conf)
val fruits = sc.textFile("in/fruits.txt")
而你正在使用dataframe
以新的方式创建SparkSession
val spark = SparkSession.builder().master("local").appName("ReadingCSV").getOrCreate()
val df = spark.createDataFrame(fruits.map { x => getRow(x)} , schema)
最终你将使用旧的rdd
函数创建的sparkContext
与使用新的dataframe
创建的sparkSession
混合。
我建议你只使用一种方法。
我猜这就是问题的原因
更新
做以下事情应该适合你
def getRow(x : String) : Row={
val columnArray = new Array[String](4)
columnArray(0)=x.substring(0,3)
columnArray(1)=x.substring(3,13)
columnArray(2)=x.substring(13,18)
columnArray(3)=x.substring(18,22)
Row.fromSeq(columnArray)
}
Logger.getLogger("org").setLevel(Level.ERROR)
val spark = SparkSession.builder().master("local").appName("ReadingCSV").getOrCreate()
val fruits = spark.sparkContext.textFile("in/fruits.txt")
val schemaString = "id,fruitName,isAvailable,unitPrice";
val fields = schemaString.split(",").map( field => StructField(field,StringType,nullable=true))
val schema = StructType(fields)
val df = spark.createDataFrame(fruits.map { x => getRow(x)} , schema)