我有如下代码和数据:
df_renamed = df.withColumnRenamed("id","steps.id").withColumnRenamed("status_1","steps.status").withColumnRenamed("severity","steps.error.severity")
df_renamed.show(truncate=False)
+----------+-------+------+-----------------------+------------+--------------------+
|apiVersion|expired|status|steps.id |steps.status|steps.error.severity|
+----------+-------+------+-----------------------+------------+--------------------+
|2 |false |200 |mexican-curp-validation|200 |null |
+----------+-------+------+-----------------------+------------+--------------------+
现在我想将这些数据转换如下:
+----------+-------+------+-----------------------+------------+--------------------+
|apiVersion|expired|status|steps |
+----------+-------+------+-----------------------+------------+--------------------+
|2 |false |200 |{"id":"mexican-curp-validation", "status":200 ,"error":{"severity":null}} |
+----------+-------+------+-----------------------+------------+--------------------+
可以看到,基于列名的点表示法,数据中形成了 JSON 结构。因此,我使用了以下代码:
cols_list = [name for name in df_renamed.columns if "." in name]
df_new = df_renamed.withColumn("steps",F.to_json(F.struct(*cols_list)))
df_new.show()
但即使该列存在,它也会给出以下错误:
df_new = df_renamed.withColumn("steps",F.to_json(F.struct(*cols_list)))
File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/pyspark/sql/dataframe.py", line 3036, in withColumn
return DataFrame(self._jdf.withColumn(colName, col._jc), self.sparkSession)
File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/py4j/java_gateway.py", line 1321, in __call__
return_value = get_return_value(
File "/Users/../IdeaProjects/pocs/venvsd/lib/python3.9/site-packages/pyspark/sql/utils.py", line 196, in deco
raise converted from None
pyspark.sql.utils.AnalysisException: Column 'steps.id' does not exist. Did you mean one of the following? [steps.id, expired, status, steps.status, apiVersion, steps.error.severity];
'Project [apiVersion#17, expired#18, status#19, steps.id#29, steps.status#37, steps.error.severity#44, to_json(struct(id, 'steps.id, status, 'steps.status, severity, 'steps.error.severity), Some(GMT+05:30)) AS steps#82]
+- Project [apiVersion#17, expired#18, status#19, steps.id#29, steps.status#37, severity#22 AS steps.error.severity#44]
+- Project [apiVersion#17, expired#18, status#19, steps.id#29, status_1#21 AS steps.status#37, severity#22]
+- Project [apiVersion#17, expired#18, status#19, id#20 AS steps.id#29, status_1#21, severity#22]
+- Relation [apiVersion#17,expired#18,status#19,id#20,status_1#21,severity#22] csv
我哪里错了?非常感谢任何帮助。
列名称中的特殊字符可以使用名称“`”中的反引号进行转义,如下
cols_list = ["`" + name + "`" for name in df1.columns if "." in name]
这样,当您在 to_json 函数中引用它时,它将是“
step.id
”,成功转义了“.”。性格
处理包含任何特殊字符的列名称时要小心,我建议尽可能避免使用它。