将嵌套 json 的几个字段转换为 Pyspark 中的字典

问题描述 投票:0回答:2

我有一个巨大的嵌套 json,如下所示

  "evaluation_parameters": {},
  "meta": {
    "active_batch_definition": {
      "batch_identifiers": {
        "pipeline_stage": "prod",
        "run_id": "run_20220224"
      },
      "data_asset_name": "STORES_DQ_SUITE",
      "data_connector_name": "stores_connector",
      "datasource_name": "stores"
    },
    "batch_markers": {
      "ge_load_time": "20220224T054318.272571Z"
    },
    "batch_spec": {
      "batch_data": "SparkDataFrame",
      "data_asset_name": "STORES_DQ_SUITE"
    },
    "expectation_suite_name": "STORES_DQ_SUITE",
    "great_expectations_version": "0.14.7",
    "run_id": {
      "run_name": "stores_template_20220224-054316",
      "run_time": "2022-02-24T05:43:16.678220+00:00"
    },
    "validation_time": "20220224T054318.389119Z"
  },
  "results": [
    {
      "exception_info": {
        "exception_message": null,
        "exception_traceback": null,
        "raised_exception": false
      },
      "expectation_config": {
        "expectation_type": "expect_column_to_exist",
        "kwargs": {
          "batch_id": "46f2769bf8c7729a40efddfa0597de22",
          "column": "country"
        },
        "meta": {}
      },
      "meta": {},
      "result": {},
      "success": true
    },
    {
      "exception_info": {
        "exception_message": null,
        "exception_traceback": null,
        "raised_exception": false
      },
      "expectation_config": {
        "expectation_type": "expect_column_values_to_not_be_null",
        "kwargs": {
          "batch_id": "46f2769bf8c7729a40efddfa0597de22",
          "column": "country"
        },
        "meta": {}
      },
      "meta": {},
      "result": {
        "element_count": 102,
        "partial_unexpected_counts": [],
        "partial_unexpected_index_list": null,
        "partial_unexpected_list": [],
        "unexpected_count": 0,
        "unexpected_percent": 0.0
      },
      "success": true
    },
    {
      "exception_info": {
        "exception_message": null,
        "exception_traceback": null,
        "raised_exception": false
      },
      "expectation_config": {
        "expectation_type": "expect_column_values_to_be_of_type",
        "kwargs": {
          "batch_id": "46f2769bf8c7729a40efddfa0597de22",
          "column": "country",
          "type_": "StringType"
        },
        "meta": {}
      },
      "meta": {},
      "result": {
        "observed_value": "StringType"
      },
      "success": true
    },
    {
      "exception_info": {
        "exception_message": null,
        "exception_traceback": null,
        "raised_exception": false
      },
      "expectation_config": {
        "expectation_type": "expect_column_to_exist",
        "kwargs": {
          "batch_id": "46f2769bf8c7729a40efddfa0597de22",
          "column": "countray"
        },
        "meta": {}
      },
      "meta": {},
      "result": {},
      "success": false
    },
    {
      "exception_info": {
        "exception_message": null,
        "exception_traceback": null,
        "raised_exception": false
      },
      "expectation_config": {
        "expectation_type": "expect_table_row_count_to_equal",
        "kwargs": {
          "batch_id": "46f2769bf8c7729a40efddfa0597de22",
          "value": 10
        },
        "meta": {}
      },
      "meta": {},
      "result": {
        "observed_value": 102
      },
      "success": false
    },
    {
      "exception_info": {
        "exception_message": null,
        "exception_traceback": null,
        "raised_exception": false
      },
      "expectation_config": {
        "expectation_type": "expect_column_sum_to_be_between",
        "kwargs": {
          "batch_id": "46f2769bf8c7729a40efddfa0597de22",
          "column": "active_stores",
          "max_value": 1000,
          "min_value": 100
        },
        "meta": {}
      },
      "meta": {},
      "result": {
        "observed_value": 22075.0
      },
      "success": false
    }
  ],
  "statistics": {
    "evaluated_expectations": 6,
    "success_percent": 50.0,
    "successful_expectations": 3,
    "unsuccessful_expectations": 3
  },
  "success": false
}

我想导出一个表,其值具有以下谱系 -

data_source:硬编码值

运行时间:meta.run_id.运行时间

expectation_type:结果.expectation_config.expectation_type

期望:results.expectation_config.kwargs(字典中除batch_id之外的所有值)

结果:results.result(一切都是字典)

预期结果

+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|data_source        |run_time                        |expectation_type                          |expectations                                                           |results                                                                                                                                                                            |success        |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist                    |{"column": "country"}                                                  |{}                                                                                                                                                                                 |true           |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_not_be_null       |{"column": "country"}                                                  |{"element_count": 102, "partial_unexpected_counts": [], "partial_unexpected_index_list": null, "partial_unexpected_list": [], "unexpected_count": 0, "unexpected_percent": 0.0}    |true           |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_values_to_be_of_type        |{"column": "country","type_": "StringType"}                            |{"observed_value": "StringType"}                                                                                                                                                   |true           |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_to_exist                    |{"column": "countray"}                                                 |{}                                                                                                                                                                                 |false          |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_table_row_count_to_equal           |{"value": 10}                                                          |{"observed_value": 102}                                                                                                                                                            |false          |
|hardcoded_value    |2022-02-24T05:43:16.678220+00:00|expect_column_sum_to_be_between           |{"column": "active_stores","max_value": 1000,"min_value": 100}         |{"observed_value": 22075.0}                                                                                                                                                        |false          |
+-------------------+--------------------------------+------------------------------------------+-----------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---------------+

有人可以帮我解决这个问题吗?

提前谢谢您。

apache-spark pyspark apache-spark-sql
2个回答
0
投票

使用

spark.read.json
函数将 json 转换为 dataframe。

之后,它为您提供带有父键的 df 作为单独的列。 之后,您需要使用

explode
spark.sql.functions
函数来分解结果列。欲了解更多详情,请阅读

然后只需从分解列中选择您需要的字段即可。

from pyspark.sql.functions import explode

df = spark.read.json(json_path)
df = df.select(df.meta.run_id.run_time, df.results)
df = df.withColumn("exploded_results", explode(df.results))
df = df.select(df.meta.run_id.run_time, df.exploded_results.expectation_config.expectation_type, df.exploded_results.expectation_config.kwargs, df.exploded_results.result, df.exploded_results.success)


0
投票

读取JSON数据

df = Spark.read.json(spark.sparkContext.parallelize([json_data]))

提取并转换所需字段

results_df = df.selectExpr("将结果分解为结果", "meta.run_id.run_time")
。选择( expr("'硬编码值'作为数据源"), 科尔(“运行时间”), col("结果.expectation_config.expectation_type"), expr("map_from_arrays(array_remove(map_keys(result.expectation_config.kwargs), 'batch_id'), array_remove(map_values(result.expectation_config.kwargs), result.expectation_config.kwargs.batch_id)) 作为期望"), col("结果.结果").alias("结果"), col("结果.成功") )

显示结果

results_df.show(截断=False)

© www.soinside.com 2019 - 2024. All rights reserved.