df = sc.parallelize(data).map(lambda x: json.dumps(x))
这给了我:
STUDENT_ID
港 | false | |||
---|---|---|---|---|
4321 | def | true | {home,01-01-2020} | |
null | ||||
room_id | type |
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, BooleanType
# Define the schema
schema = StructType([
StructField("student_id", IntegerType(), nullable=False),
StructField("room_id", StringType(), nullable=False),
StructField("enrolled", BooleanType(), nullable=True),
StructField("enrollment", StructType([
StructField("type", StringType(), nullable=True),
StructField("date", StringType(), nullable=True)
]), nullable=True),
StructField("sports", StructType([
StructField("team", StringType(), nullable=True),
StructField("position", StringType(), nullable=True)
]), nullable=True)
])
json = '''
[
{ "student_id": 1234,
"room_id": "abc",
"enrolled": false
},
{ "student_id": 4321,
"room_id": "def",
"enrolled": true,
"enrollment": {
"type": "home",
"date": "01-01-2020"
}
},
{ "student_id": 678,
"room_id": "htf",
"sports": {
"team": "hockey",
"position": "forward"
}
}
]
'''
df = spark.read.schema(schema).json(spark.sparkContext.parallelize([json]))
(
df.selectExpr("student_id",
"room_id",
"enrolled",
"enrollment.type as type",
"enrollment.date as date",
"sports.team as team",
"sports.position as position")
.show()
)
输出:+----------+-------+--------+----+----------+------+--------+
|student_id|room_id|enrolled|type| date| team|position|
+----------+-------+--------+----+----------+------+--------+
| 1234| abc| false|null| null| null| null|
| 4321| def| true|home|01-01-2020| null| null|
| 678| htf| null|null| null|hockey| forward|
+----------+-------+--------+----+----------+------+--------+
false | null | null | 4321 | def | ||
---|---|---|---|---|---|---|
01-01-2020 | null | 678 | htf | null | NULL. | |
forward | Pyspark.sparkcontext.parallize | |||||