我有一个没有标题的管道分隔文本文件,行有不同数量的列(有些行是类型A
有400列,其他行类型B
有200,所以我需要先将它们分开):
val textFileRaw = sc.textFile("./data.txt")
val textFile = textFileRaw.map(line => line.split("\\|", -1))
val dataA = textFile.filter(line => line(0) == "A")
val dataB = textFile.filter(line => line(0) == "B")
现在我想将这些RDD转换为Spark DataFrames,但是拆分返回单个数组,而不是400或200个不同的值。这会导致以下错误:
# ANames are my column names, length=400
val ANames = Array("Row ID", "City", "State", ...)
val dataADF = dataA.toDF(ANames: _*)
Name: java.lang.IllegalArgumentException
Message: requirement failed: The number of columns doesn't match.
Old column names (1): value
New column names (400): Row ID, City, State ...
This question面临同样的问题,但所有答案建议手动指定从数组到元组的映射,这在数百列的情况下并不是很好。
如果我使用Spark's csv loader,我想我可以让它工作,但这对我的数据不起作用,因为行有不同的字段数(它不是真正的csv文件)。解决方法是首先拆分文件,编写格式良好的csv的新文件,然后使用csv加载器,但我想尽可能避免这种情况。如何将这些RDD转换为具有命名列的DataFrame?
您应该创建一个模式并使用SQLContext.createDataFrame api as
val dataA = textFile.filter(line => line(0) == "A")
val ANames = Array("Row ID", "City", "State", "kjl")
val dataADF = sqlContext.createDataFrame(dataA.map(Row.fromSeq(_)), StructType(ANames.map(StructField(_, StringType, true))))
它应该工作。但请注意,我已将所有数据类型用作StringType()
。您可以根据需要进行更改。