我有一个 Spark 数据框,如下:
----------------------------------------------------------------------------------------------
| type | lctNbr | itmNbr | lastUpdatedDate | lctSeqId| T7797_PRD_LCT_TYP_CD| FXT_AIL_ID| pmyVbuNbr | upcId | vndModId|
____________________________________________________________________________
| prd_lct 145 147 2024-07-22T05:24:14 1 1 14 126 008236686661 35216
_____________________________________________________________________________
我想按类型lctNbr、itmNbr 和lastUpdatedDate 对该数据帧进行分组。我只是希望每条记录都采用以下 json 格式:
"type": "prd_lct",
"lctNbr": 145,
"itmNbr": 147,
"lastUpdatedDate": "2024-07-22T05:24:14",
"locations": [
{
"lctSeqId": 1,
"prdLctTypCd": 1,
"fxtAilId": "14"
}
],
"itemDetails": [
{
"pmyVbuNbr": 126,
"upcId": "008236686661",
"vndModId": "35216"
]
}
我尝试使用
to_json, collect_list and map_from_entries
函数,但当我执行 show 命令时,我总是收到错误,并且似乎无法获得正确的格式。
您可以按所需字段进行分组,然后聚合
F.collect_list(F.create_map(...))
以获得 locations
和 itemDetails
的内部字段。
样本数据:
pandasDF = pd.DataFrame({
"type": ["prd_lct","prd_lct","test"],
"lctNbr": [145, 145, 148],
"itmNbr": [147, 147, 150],
"lastUpdatedDate": ["2024-07-22T05:24:14", "2024-07-22T05:24:14", "2024-07-22T05:24:15"],
"lctSeqId": [1,2,3],
"T7797_PRD_LCT_TYP_CD": [1,2,3],
"FXT_AIL_ID": ["14","15","16"],
"pmyVbuNbr": [126, 127, 128],
"upcId": ["008236686661","008236686662","008236686663"],
"vndModId": ["35216","35217","35218"]
})
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
| type|lctNbr|itmNbr| lastUpdatedDate|lctSeqId|T7797_PRD_LCT_TYP_CD|FXT_AIL_ID|pmyVbuNbr| upcId|vndModId|
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
|prd_lct| 145| 147|2024-07-22T05:24:14| 1| 1| 14| 126|008236686661| 35216|
|prd_lct| 145| 147|2024-07-22T05:24:14| 2| 2| 15| 127|008236686662| 35217|
| test| 148| 150|2024-07-22T05:24:15| 3| 3| 16| 128|008236686663| 35218|
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
生成的 DataFrame 并转换为 JSON 编码字符串列表。
resultDF = sparkDF.groupby(
'type', 'lctNbr', 'itmNbr', 'lastUpdatedDate'
).agg(
F.collect_list(
F.create_map(
F.lit('lctSeqId'), F.col('lctSeqId'),
F.lit('prdLctTypCd'), F.col('T7797_PRD_LCT_TYP_CD'),
F.lit('fxtAilId'), F.col('FXT_AIL_ID'),
)
).alias('locations'),
F.collect_list(
F.create_map(
F.lit('pmyVbuNbr'), F.col('pmyVbuNbr'),
F.lit('upcId'), F.col('upcId'),
F.lit('vndModId'), F.col('vndModId'),
)
).alias('itemDetails')
)
resultJSON = result.toJSON().collect()
由于
resultJSON
将是 JSON 编码字符串的列表,因此您可以使用以下命令将其转换为字典:
import ast
result_dict = [ast.literal_eval(x) for x in resultJSON]
[
{
"type": "prd_lct",
"lctNbr": 145,
"itmNbr": 147,
"lastUpdatedDate": "2024-07-22T05:24:14",
"locations": [
{
"lctSeqId": "1",
"prdLctTypCd": "1",
"fxtAilId": "14"
},
{
"lctSeqId": "2",
"prdLctTypCd": "2",
"fxtAilId": "15"
}
],
"itemDetails": [
{
"pmyVbuNbr": "126",
"upcId": "008236686661",
"vndModId": "35216"
},
{
"pmyVbuNbr": "127",
"upcId": "008236686662",
"vndModId": "35217"
}
]
},
{
"type": "test",
"lctNbr": 148,
"itmNbr": 150,
"lastUpdatedDate": "2024-07-22T05:24:15",
"locations": [
{
"lctSeqId": "3",
"prdLctTypCd": "3",
"fxtAilId": "16"
}
],
"itemDetails": [
{
"pmyVbuNbr": "128",
"upcId": "008236686663",
"vndModId": "35218"
}
]
}