如何使用Map对象列表创建Spark SQL Dataframe

问题描述 投票:1回答:3

我在Map[String, String](Scala)中有多个List。例如:

map1 = Map("EMP_NAME" -> “Ahmad”, "DOB" -> “01-10-1991”, "CITY" -> “Dubai”)
map2 = Map("EMP_NAME" -> “Rahul”, "DOB" -> “06-12-1991”, "CITY" -> “Mumbai”)
map3 = Map("EMP_NAME" -> “John”, "DOB" -> “11-04-1996”, "CITY" -> “Toronto”)
list = List(map1, map2, map3)

现在我想用这样的东西创建一个数据帧:

EMP_NAME    DOB             CITY
Ahmad       01-10-1991      Dubai
Rahul       06-12-1991      Mumbai
John        11-04-1996      Toronto

我该如何实现这一目标?

scala apache-spark dataframe apache-spark-sql bigdata
3个回答
0
投票

稍微不那么具体的方法,例如:

val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
val map2 = Map("EMP_NAME" -> "John",  "DOB" -> "01-10-1992", "CITY" -> "Mumbai")
///...
val list = List(map1, map2) // map3, ...
val RDDmap = sc.parallelize(list)

// Get cols dynamically
val cols = RDDmap.take(1).flatMap(x=> x.keys)

// Map is K,V like per Map entry
val df = RDDmap.map{ value=>
                     val list=value.values.toList
                     (list(0), list(1), list(2))
       }.toDF(cols:_*) // dynamic column names assigned

df.show(false)

收益:

+--------+----------+------+
|EMP_NAME|DOB       |CITY  |
+--------+----------+------+
|Ahmad   |01-10-1991|Dubai |
|John    |01-10-1992|Mumbai|
+--------+----------+------+

或者回答你的子问题,如下所示 - 至少我认为这是你要问的,但可能不是:

val RDDmap = sc.parallelize(List(
   Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai"),
   Map("EMP_NAME" -> "John",  "DOB" -> "01-10-1992", "CITY" -> "Mumbai")))
   ...

// Get cols dynamically
val cols = RDDmap.take(1).flatMap(x=> x.keys)

// Map is K,V like per Map entry
val df = RDDmap.map{ value=>
                 val list=value.values.toList
                 (list(0), list(1), list(2))
       }.toDF(cols:_*) // dynamic column names assigned

您当然可以动态构建列表,但仍需要分配Map元素。见Appending Data to List or any other collection Dynamically in scala。我只是从文件中读入并完成它。


2
投票

你可以这样做:

import spark.implicits._

val df = list
  .map( m => (m.get("EMP_NAME"),m.get("DOB"),m.get("CITY")))
  .toDF("EMP_NAME","DOB","CITY")

df.show()

+--------+----------+-------+
|EMP_NAME|       DOB|   CITY|
+--------+----------+-------+
|   Ahmad|01-10-1991|  Dubai|
|   Rahul|06-12-1991| Mumbai|
|    John|11-04-1996|Toronto|
+--------+----------+-------+

0
投票
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}

object DataFrameTest2 extends Serializable {
  var sparkSession: SparkSession = _
  var sparkContext: SparkContext = _
  var sqlContext: SQLContext = _

  def main(args: Array[String]): Unit = {
    sparkSession = SparkSession.builder().appName("TestMaster").master("local").getOrCreate()
    sparkContext = sparkSession.sparkContext

    val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)

    val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
    val map2 = Map("EMP_NAME" -> "Rahul", "DOB" -> "06-12-1991", "CITY" -> "Mumbai")
    val map3 = Map("EMP_NAME" -> "John", "DOB" -> "11-04-1996", "CITY" -> "Toronto")
    val list = List(map1, map2, map3)

    //create your rows
    val rows = list.map(m => Row(m.values.toSeq:_*))

    //create the schema from the header
    val header = list.head.keys.toList
    val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true)))

    //create your rdd
    val rdd = sparkContext.parallelize(rows)

    //create your dataframe using rdd
    val df = sparkSession.createDataFrame(rdd, schema)
    df.show()
  }
}
© www.soinside.com 2019 - 2024. All rights reserved.