我有一个巨大的嵌套 json,如下所示
"evaluation_parameters": {},
"meta": {
"active_batch_definition": {
"batch_identifiers": {
"pipeline_stage": "prod",
"run_id": "run_20220224"
},
"data_asset_name": "STORES_DQ_SUITE",
"data_connector_name": "stores_connector",
"datasource_name": "stores"
},
"batch_markers": {
"ge_load_time": "20220224T054318.272571Z"
},
"batch_spec": {
"batch_data": "SparkDataFrame",
"data_asset_name": "STORES_DQ_SUITE"
},
"expectation_suite_name": "STORES_DQ_SUITE",
"great_expectations_version": "0.14.7",
"run_id": {
"run_name": "stores_template_20220224-054316",
"run_time": "2022-02-24T05:43:16.678220+00:00"
},
"validation_time": "20220224T054318.389119Z"
},
"results": [
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_to_exist",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country"
},
"meta": {}
},
"meta": {},
"result": {},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_values_to_not_be_null",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country"
},
"meta": {}
},
"meta": {},
"result": {
"element_count": 102,
"partial_unexpected_counts": [],
"partial_unexpected_index_list": null,
"partial_unexpected_list": [],
"unexpected_count": 0,
"unexpected_percent": 0.0
},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_values_to_be_of_type",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "country",
"type_": "StringType"
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": "StringType"
},
"success": true
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_to_exist",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "countray"
},
"meta": {}
},
"meta": {},
"result": {},
"success": false
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_table_row_count_to_equal",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"value": 10
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 102
},
"success": false
},
{
"exception_info": {
"exception_message": null,
"exception_traceback": null,
"raised_exception": false
},
"expectation_config": {
"expectation_type": "expect_column_sum_to_be_between",
"kwargs": {
"batch_id": "46f2769bf8c7729a40efddfa0597de22",
"column": "active_stores",
"max_value": 1000,
"min_value": 100
},
"meta": {}
},
"meta": {},
"result": {
"observed_value": 22075.0
},
"success": false
}
],
"statistics": {
"evaluated_expectations": 6,
"success_percent": 50.0,
"successful_expectations": 3,
"unsuccessful_expectations": 3
},
"success": false
}
我想导出一个表,其值具有以下谱系 -
data_source:硬编码值
运行时间:meta.run_id.运行时间
expectation_type:结果.expectation_config.expectation_type
期望:results.expectation_config.kwargs(字典中除batch_id之外的所有值)
结果:results.result(一切都是字典)
预期结果
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|data_source |run_time |expectation_type |expectations |results |success |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist |{"column": "country"} |{} |true |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_not_be_null |{"column": "country"} |{"element_count": 102, "partial_unexpected_counts": [], "partial_unexpected_index_list": null, "partial_unexpected_list": [], "unexpected_count": 0, "unexpected_percent": 0.0} |true |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_be_of_type |{"column": "country","type_": "StringType"} |{"observed_value": "StringType"} |true |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist |{"column": "countray"} |{} |false |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_table_row_count_to_equal |{"value": 10} |{"observed_value": 102} |false |
|hardcoded_value |2022-02-24T05:43:16.678220+00:00|expect_column_sum_to_be_between |{"column": "active_stores","max_value": 1000,"min_value": 100} |{"observed_value": 22075.0} |false |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
有人可以帮我解决这个问题吗?
提前谢谢您。
使用
spark.read.json
函数将 json 转换为 dataframe。
之后,它为您提供带有父键的 df 作为单独的列。 之后,您需要使用
explode
的 spark.sql.functions
函数来分解结果列。欲了解更多详情,请阅读此
然后只需从分解列中选择您需要的字段即可。
from pyspark.sql.functions import explode
df = spark.read.json(json_path)
df = df.select(df.meta.run_id.run_time, df.results)
df = df.withColumn("exploded_results", explode(df.results))
df = df.select(df.meta.run_id.run_time, df.exploded_results.expectation_config.expectation_type, df.exploded_results.expectation_config.kwargs, df.exploded_results.result, df.exploded_results.success)
df = Spark.read.json(spark.sparkContext.parallelize([json_data]))
results_df = df.selectExpr("将结果分解为结果", "meta.run_id.run_time")
。选择(
expr("'硬编码值'作为数据源"),
科尔(“运行时间”),
col("结果.expectation_config.expectation_type"),
expr("map_from_arrays(array_remove(map_keys(result.expectation_config.kwargs), 'batch_id'), array_remove(map_values(result.expectation_config.kwargs), result.expectation_config.kwargs.batch_id)) 作为期望"),
col("结果.结果").alias("结果"),
col("结果.成功")
)
results_df.show(截断=False)