使用 Pyspark 将值收集为父列中的字典

问题描述 投票:0回答:1

我有如下代码和数据:

df_renamed = df.withColumnRenamed("id","steps.id").withColumnRenamed("status_1","steps.status").withColumnRenamed("severity","steps.error.severity")

df_renamed.show(truncate=False)

+----------+-------+------+-----------------------+------------+--------------------+
|apiVersion|expired|status|steps.id               |steps.status|steps.error.severity|
+----------+-------+------+-----------------------+------------+--------------------+
|2         |false  |200   |mexican-curp-validation|200         |null                |
+----------+-------+------+-----------------------+------------+--------------------+  

现在我想将这些数据转换如下:

+----------+-------+------+-----------------------+------------+--------------------+
|apiVersion|expired|status|steps                                                    |
+----------+-------+------+-----------------------+------------+--------------------+
|2         |false  |200   |{"id":"mexican-curp-validation", "status":200 ,"error":{"severity":null}}               |
+----------+-------+------+-----------------------+------------+--------------------+  

可以看到,基于列名的点表示法,数据中形成了 JSON 结构。因此,我使用了以下代码:

cols_list = [name for name in df_renamed.columns if "." in name]
df_new = df_renamed.withColumn("steps",F.to_json(F.struct(*cols_list)))
df_new.show()   

但即使该列存在,它也会给出以下错误:

 df_new = df_renamed.withColumn("steps",F.to_json(F.struct(*cols_list)))
  File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 3036, in withColumn
    return DataFrame(self._jdf.withColumn(colName, col._jc), self.sparkSession)
  File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
    return_value = get_return_value(
  File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/pyspark/sql/utils.py", line 196, in deco
    raise converted from None
pyspark.sql.utils.AnalysisException: Column 'steps.id' does not exist. Did you mean one of the following? [steps.id, expired, status, steps.status, apiVersion, steps.error.severity];
'Project [apiVersion#17, expired#18, status#19, steps.id#29, steps.status#37, steps.error.severity#44, to_json(struct(id, 'steps.id, status, 'steps.status, severity, 'steps.error.severity), Some(GMT+05:30)) AS steps#82]
+- Project [apiVersion#17, expired#18, status#19, steps.id#29, steps.status#37, severity#22 AS steps.error.severity#44]
   +- Project [apiVersion#17, expired#18, status#19, steps.id#29, status_1#21 AS steps.status#37, severity#22]
      +- Project [apiVersion#17, expired#18, status#19, id#20 AS steps.id#29, status_1#21, severity#22]
         +- Relation [apiVersion#17,expired#18,status#19,id#20,status_1#21,severity#22] csv

我哪里错了?非常感谢任何帮助。

python python-3.x apache-spark pyspark
1个回答
0
投票

列名称中的特殊字符可以使用名称“`”中的反引号进行转义,如下

cols_list = ["`" + name + "`" for name in df1.columns if "." in name]

这样,当您在 to_json 函数中引用它时,它将是“

step.id
”,成功转义了“.”。性格

处理包含任何特殊字符的列名称时要小心,我建议尽可能避免使用它。

© www.soinside.com 2019 - 2024. All rights reserved.