将数据帧转换为嵌套的 json 记录

问题描述 投票:0回答:1

我有一个 Spark 数据框,如下:


----------------------------------------------------------------------------------------------
| type |   lctNbr   | itmNbr |   lastUpdatedDate   | lctSeqId|  T7797_PRD_LCT_TYP_CD|   FXT_AIL_ID| pmyVbuNbr | upcId   |   vndModId|
____________________________________________________________________________
| prd_lct   145         147       2024-07-22T05:24:14   1          1                         14       126       008236686661    35216

_____________________________________________________________________________

我想按类型lctNbr、itmNbr 和lastUpdatedDate 对该数据帧进行分组。我只是希望每条记录都采用以下 json 格式:

  "type": "prd_lct",
  "lctNbr": 145,
  "itmNbr": 147,
  "lastUpdatedDate": "2024-07-22T05:24:14",
  "locations": [
    {
      "lctSeqId": 1,
      "prdLctTypCd": 1,
      "fxtAilId": "14"
    }
  ],
  "itemDetails": [
    {
      "pmyVbuNbr": 126,
      "upcId": "008236686661",
      "vndModId": "35216"
  ]
}

我尝试使用

to_json, collect_list and map_from_entries
函数,但当我执行 show 命令时,我总是收到错误,并且似乎无法获得正确的格式。

python json apache-spark pyspark apache-spark-sql
1个回答
0
投票

您可以按所需字段进行分组,然后聚合

F.collect_list(F.create_map(...))
以获得
locations
itemDetails
的内部字段。

样本数据:

pandasDF = pd.DataFrame({
    "type": ["prd_lct","prd_lct","test"],
    "lctNbr": [145, 145, 148],
    "itmNbr": [147, 147, 150],
    "lastUpdatedDate": ["2024-07-22T05:24:14", "2024-07-22T05:24:14", "2024-07-22T05:24:15"],
    "lctSeqId": [1,2,3],
    "T7797_PRD_LCT_TYP_CD": [1,2,3],
    "FXT_AIL_ID": ["14","15","16"],
    "pmyVbuNbr": [126, 127, 128],
    "upcId": ["008236686661","008236686662","008236686663"],
    "vndModId": ["35216","35217","35218"]
})

+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
|   type|lctNbr|itmNbr|    lastUpdatedDate|lctSeqId|T7797_PRD_LCT_TYP_CD|FXT_AIL_ID|pmyVbuNbr|       upcId|vndModId|
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+
|prd_lct|   145|   147|2024-07-22T05:24:14|       1|                   1|        14|      126|008236686661|   35216|
|prd_lct|   145|   147|2024-07-22T05:24:14|       2|                   2|        15|      127|008236686662|   35217|
|   test|   148|   150|2024-07-22T05:24:15|       3|                   3|        16|      128|008236686663|   35218|
+-------+------+------+-------------------+--------+--------------------+----------+---------+------------+--------+

生成的 DataFrame 并转换为 JSON 编码字符串列表。

resultDF = sparkDF.groupby(
    'type', 'lctNbr', 'itmNbr', 'lastUpdatedDate'
).agg(
    F.collect_list(
        F.create_map(
            F.lit('lctSeqId'), F.col('lctSeqId'),
            F.lit('prdLctTypCd'), F.col('T7797_PRD_LCT_TYP_CD'),
            F.lit('fxtAilId'), F.col('FXT_AIL_ID'),
        )
    ).alias('locations'),
    F.collect_list(
        F.create_map(
            F.lit('pmyVbuNbr'), F.col('pmyVbuNbr'),
            F.lit('upcId'), F.col('upcId'),
            F.lit('vndModId'), F.col('vndModId'),
        )
    ).alias('itemDetails')
)

resultJSON = result.toJSON().collect()

由于

resultJSON
将是 JSON 编码字符串的列表,因此您可以使用以下命令将其转换为字典:

import ast
result_dict = [ast.literal_eval(x) for x in resultJSON]

[
  {
    "type": "prd_lct",
    "lctNbr": 145,
    "itmNbr": 147,
    "lastUpdatedDate": "2024-07-22T05:24:14",
    "locations": [
      {
        "lctSeqId": "1",
        "prdLctTypCd": "1",
        "fxtAilId": "14"
      },
      {
        "lctSeqId": "2",
        "prdLctTypCd": "2",
        "fxtAilId": "15"
      }
    ],
    "itemDetails": [
      {
        "pmyVbuNbr": "126",
        "upcId": "008236686661",
        "vndModId": "35216"
      },
      {
        "pmyVbuNbr": "127",
        "upcId": "008236686662",
        "vndModId": "35217"
      }
    ]
  },
  {
    "type": "test",
    "lctNbr": 148,
    "itmNbr": 150,
    "lastUpdatedDate": "2024-07-22T05:24:15",
    "locations": [
      {
        "lctSeqId": "3",
        "prdLctTypCd": "3",
        "fxtAilId": "16"
      }
    ],
    "itemDetails": [
      {
        "pmyVbuNbr": "128",
        "upcId": "008236686663",
        "vndModId": "35218"
      }
    ]
  } 
© www.soinside.com 2019 - 2024. All rights reserved.