将 Rdd 转换为数据帧

问题描述 投票:0回答:1

我有一个像这样的RDD:

RDD[(Any, Array[(Any, Any)])]
我只想将其转换为 DataFrame。因此我使用这个模式

val schema = StructType(Array (StructField("C1", StringType, true), StructField("C4", ArrayType(StringType, false), false)))

val df = Seq(
  ("A",1,"12/06/2012"),
  ("A",2,"13/06/2012"),
  ("B",3,"12/06/2012"),
  ("B",4,"17/06/2012"),
  ("C",5,"14/06/2012")).toDF("C1", "C2","C3")
df.show(false)

val rdd = df.map( line => ( line(0), (line(1), line(2))))
  .groupByKey()
  .mapValues(i => i.toList).foreach(println)

val output_df = sqlContext.createDataFrame(rdd, schema)

我的rdd看起来像这样:

(B,List((3,12/06/2012), (4,17/06/2012)))    
(A,List((1,12/06/2012), (2,13/06/2012)))    
(C,List((5,14/06/2012)))

或者像这样

(A,[Lscala.Tuple2;@3e8f27c9)
(C,[Lscala.Tuple2;@6f22defb)
(B,[Lscala.Tuple2;@1b8692ec)

如果我使用:

.mapValues(i => i.toArray)

我已经尝试过这个:

val output_df = sqlContext.createDataFrame(rdd, schema)

但我明白:

Error:(40, 32) overloaded method value createDataFrame with alternatives:
  (data: java.util.List[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.api.java.JavaRDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rdd: org.apache.spark.rdd.RDD[_],beanClass: Class[_])org.apache.spark.sql.DataFrame <and>
  (rows: java.util.List[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.api.java.JavaRDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame <and>
  (rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row],schema: org.apache.spark.sql.types.StructType)org.apache.spark.sql.DataFrame
 cannot be applied to (Unit, org.apache.spark.sql.types.StructType)
    val output_df = sqlContext.createDataFrame(rdd, schema)

致拉斐尔·罗斯
尝试了第二种方法,但不起作用,我得到:

Error:(41, 24) No TypeTag available for MySchema
    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()

第一种方法工作正常,但我丢失了元组的第一个元素

.mapValues(i => i.map(_._2))

你知道我是否可以完成第一种方法来保留两个元素

我解决了将我的元组转换为字符串的问题,但这对我来说不是优雅的解决方案,因为我必须拆分我的字符串元组才能读取该列:

val rdd = df.map(line => ( line(0), (line(1), line(2)))).groupByKey()
      .mapValues(i => i.map(w => (w._1,w._2).toString))
      .map(i=>Row(i._1,i._2))

谢谢您的帮助

scala apache-spark dataframe rdd
1个回答
0
投票

GroupByKey
给你一个元组序列,你没有在你的模式中考虑到这一点。此外,
sqlContext.createDataFrame
需要一个
RDD[Row]
,但您没有提供。

这应该可以使用您的

schema

val rdd = df.map(line => (line(0), (line(1), line(2))))
  .groupByKey()
  .mapValues(i => i.map(_._2))
  .map(i=>Row(i._1,i._2))

val output_df = sqlContext.createDataFrame(rdd, schema)

您还可以使用

case class
来映射元组(不确定是否可以通过编程方式创建元组模式):

 val df = Seq(
      ("A", 1, "12/06/2012"),
      ("A", 2, "13/06/2012"),
      ("B", 3, "12/06/2012"),
      ("B", 4, "17/06/2012"),
      ("C", 5, "14/06/2012")).toDF("C1", "C2", "C3")
    df.show(false)

    val rdd = df.map(line => (line(0), (line(1), line(2))))
      .groupByKey()
      .mapValues(i => i.toList)

    // this should be placed outside of main()
    case class MySchema(C1: String, C4: List[(Int, String)])

    val newdf = rdd.map(line => MySchema(line._1.toString, line._2.asInstanceOf[List[(Int, String)]])).toDF()
© www.soinside.com 2019 - 2024. All rights reserved.