我有一个像这样的RDD:
RDD[(Any, Array[(Any, Any)])]
我只想将其转换为 DataFrame。因此我使用这个模式
val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false)))
val df = Seq(
("A",1,"12/06/2012"),
("A",2,"13/06/2012"),
("B",3,"12/06/2012"),
("B",4,"17/06/2012"),
("C",5,"14/06/2012")).toDF("C1", "C2","C3")
df.show(false)
val rdd = df.map( line => ( line(0), (line(1), line(2))))
.groupByKey()
.mapValues(i => i.toList).foreach(println)
val output_df = sqlContext.createDataFrame(rdd, schema)
我的rdd看起来像这样:
(B,List((3,12/06/2012), (4,17/06/2012)))
(A,List((1,12/06/2012), (2,13/06/2012)))
(C,List((5,14/06/2012)))
或者像这样
(A,[Lscala.Tuple2;@3e8f27c9)
(C,[Lscala.Tuple2;@6f22defb)
(B,[Lscala.Tuple2;@1b8692ec)
如果我使用:
.mapValues(i => i.toArray)
我已经尝试过这个:
val output_df = sqlContext.createDataFrame(rdd, schema)
但我明白:
Error:(40, 32) overloaded method value createDataFrame with alternatives:
(data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
(rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
(rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
val output_df = sqlContext.createDataFrame(rdd, schema)
致拉斐尔·罗斯
尝试了第二种方法,但不起作用,我得到:
Error:(41, 24) No TypeTag available for MySchema
val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()
第一种方法工作正常,但我丢失了元组的第一个元素
.mapValues(i => i.map(_._2))
你知道我是否可以完成第一种方法来保留两个元素
我解决了将我的元组转换为字符串的问题,但这对我来说不是优雅的解决方案,因为我必须拆分我的字符串元组才能读取该列:
val rdd = df.map(line => ( line(0), (line(1), line(2)))).groupByKey()
.mapValues(i => i.map(w => (w._1,w._2).toString))
.map(i=>Row(i._1,i._2))
谢谢您的帮助
GroupByKey
给你一个元组序列,你没有在你的模式中考虑到这一点。此外,sqlContext.createDataFrame
需要一个 RDD[Row]
,但您没有提供。
这应该可以使用您的
schema
:
val rdd = df.map(line => (line(0), (line(1), line(2))))
.groupByKey()
.mapValues(i => i.map(_._2))
.map(i=>Row(i._1,i._2))
val output_df = sqlContext.createDataFrame(rdd, schema)
您还可以使用
case class
来映射元组(不确定是否可以通过编程方式创建元组模式):
val df = Seq(
("A", 1, "12/06/2012"),
("A", 2, "13/06/2012"),
("B", 3, "12/06/2012"),
("B", 4, "17/06/2012"),
("C", 5, "14/06/2012")).toDF("C1", "C2", "C3")
df.show(false)
val rdd = df.map(line => (line(0), (line(1), line(2))))
.groupByKey()
.mapValues(i => i.toList)
// this should be placed outside of main()
case class MySchema(C1: String, C4: List[(Int, String)])
val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()