我在Map[String, String]
(Scala)中有多个List
。例如:
map1 = Map("EMP_NAME" -> “Ahmad”, "DOB" -> “01-10-1991”, "CITY" -> “Dubai”)
map2 = Map("EMP_NAME" -> “Rahul”, "DOB" -> “06-12-1991”, "CITY" -> “Mumbai”)
map3 = Map("EMP_NAME" -> “John”, "DOB" -> “11-04-1996”, "CITY" -> “Toronto”)
list = List(map1, map2, map3)
现在我想用这样的东西创建一个数据帧:
EMP_NAME DOB CITY
Ahmad 01-10-1991 Dubai
Rahul 06-12-1991 Mumbai
John 11-04-1996 Toronto
我该如何实现这一目标?
稍微不那么具体的方法,例如:
val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
val map2 = Map("EMP_NAME" -> "John", "DOB" -> "01-10-1992", "CITY" -> "Mumbai")
///...
val list = List(map1, map2) // map3, ...
val RDDmap = sc.parallelize(list)
// Get cols dynamically
val cols = RDDmap.take(1).flatMap(x=> x.keys)
// Map is K,V like per Map entry
val df = RDDmap.map{ value=>
val list=value.values.toList
(list(0), list(1), list(2))
}.toDF(cols:_*) // dynamic column names assigned
df.show(false)
收益:
+--------+----------+------+
|EMP_NAME|DOB |CITY |
+--------+----------+------+
|Ahmad |01-10-1991|Dubai |
|John |01-10-1992|Mumbai|
+--------+----------+------+
或者回答你的子问题,如下所示 - 至少我认为这是你要问的,但可能不是:
val RDDmap = sc.parallelize(List(
Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai"),
Map("EMP_NAME" -> "John", "DOB" -> "01-10-1992", "CITY" -> "Mumbai")))
...
// Get cols dynamically
val cols = RDDmap.take(1).flatMap(x=> x.keys)
// Map is K,V like per Map entry
val df = RDDmap.map{ value=>
val list=value.values.toList
(list(0), list(1), list(2))
}.toDF(cols:_*) // dynamic column names assigned
您当然可以动态构建列表,但仍需要分配Map元素。见Appending Data to List or any other collection Dynamically in scala。我只是从文件中读入并完成它。
你可以这样做:
import spark.implicits._
val df = list
.map( m => (m.get("EMP_NAME"),m.get("DOB"),m.get("CITY")))
.toDF("EMP_NAME","DOB","CITY")
df.show()
+--------+----------+-------+
|EMP_NAME| DOB| CITY|
+--------+----------+-------+
| Ahmad|01-10-1991| Dubai|
| Rahul|06-12-1991| Mumbai|
| John|11-04-1996|Toronto|
+--------+----------+-------+
import org.apache.spark.SparkContext
import org.apache.spark.sql._
import org.apache.spark.sql.types.{StringType, StructField, StructType}
object DataFrameTest2 extends Serializable {
var sparkSession: SparkSession = _
var sparkContext: SparkContext = _
var sqlContext: SQLContext = _
def main(args: Array[String]): Unit = {
sparkSession = SparkSession.builder().appName("TestMaster").master("local").getOrCreate()
sparkContext = sparkSession.sparkContext
val sqlContext = new org.apache.spark.sql.SQLContext(sparkContext)
val map1 = Map("EMP_NAME" -> "Ahmad", "DOB" -> "01-10-1991", "CITY" -> "Dubai")
val map2 = Map("EMP_NAME" -> "Rahul", "DOB" -> "06-12-1991", "CITY" -> "Mumbai")
val map3 = Map("EMP_NAME" -> "John", "DOB" -> "11-04-1996", "CITY" -> "Toronto")
val list = List(map1, map2, map3)
//create your rows
val rows = list.map(m => Row(m.values.toSeq:_*))
//create the schema from the header
val header = list.head.keys.toList
val schema = StructType(header.map(fieldName => StructField(fieldName, StringType, true)))
//create your rdd
val rdd = sparkContext.parallelize(rows)
//create your dataframe using rdd
val df = sparkSession.createDataFrame(rdd, schema)
df.show()
}
}