如何将字典列表转换为Spark DataFrame

问题描述 投票:3回答:2

我想将我的词典列表转换为DataFrame。这是清单:

mylist = 
[
  {"type_activity_id":1,"type_activity_name":"xxx"},
  {"type_activity_id":2,"type_activity_name":"yyy"},
  {"type_activity_id":3,"type_activity_name":"zzz"}
]

这是我的代码:

from pyspark.sql.types import StringType

df = spark.createDataFrame(mylist, StringType())

df.show(2,False)

+-----------------------------------------+
|                                    value|
+-----------------------------------------+
|{type_activity_id=1,type_activity_id=xxx}|
|{type_activity_id=2,type_activity_id=yyy}|
|{type_activity_id=3,type_activity_id=zzz}|
+-----------------------------------------+

我假设我应该为每列提供一些映射和类型,但我不知道该怎么做。

更新:

我也试过这个:

schema = ArrayType(
    StructType([StructField("type_activity_id", IntegerType()),
                StructField("type_activity_name", StringType())
                ]))
df = spark.createDataFrame(mylist, StringType())
df = df.withColumn("value", from_json(df.value, schema))

但后来我得到null值:

+-----+
|value|
+-----+
| null|
| null|
+-----+
python pyspark apache-spark-sql
2个回答
3
投票

你可以这样做。您将获得一个包含2列的数据框。

mylist = [
  {"type_activity_id":1,"type_activity_name":"xxx"},
  {"type_activity_id":2,"type_activity_name":"yyy"},
  {"type_activity_id":3,"type_activity_name":"zzz"}
]

myJson = sc.parallelize(mylist)
myDf = sqlContext.read.json(mylist)

输出:

+----------------+------------------+
|type_activity_id|type_activity_name|
+----------------+------------------+
|               1|               xxx|
|               2|               yyy|
|               3|               zzz|
+----------------+------------------+

4
投票

在过去,您只需将字典传递给spark.createDataFrame(),但现在已弃用:

mylist = [
  {"type_activity_id":1,"type_activity_name":"xxx"},
  {"type_activity_id":2,"type_activity_name":"yyy"},
  {"type_activity_id":3,"type_activity_name":"zzz"}
]
df = spark.createDataFrame(mylist)
#UserWarning: inferring schema from dict is deprecated,please use pyspark.sql.Row instead
#  warnings.warn("inferring schema from dict is deprecated,"

正如此警告消息所示,您应该使用pyspark.sql.Row代替。

from pyspark.sql import Row
spark.createDataFrame(Row(**x) for x in mylist).show(truncate=False)
#+----------------+------------------+
#|type_activity_id|type_activity_name|
#+----------------+------------------+
#|1               |xxx               |
#|2               |yyy               |
#|3               |zzz               |
#+----------------+------------------+

在这里,我使用**keyword argument unpacking)将字典传递给Row构造函数。

© www.soinside.com 2019 - 2024. All rights reserved.