使用 Scala 和 Apache Spark 检索存储的 JSON 字符串作为 Delta 表的列

问题描述 投票:0回答:1

需要解决的问题

需要从delta表中提取json字符串并最终解析它。

show
函数可以用来查看数据,但需要将其提取到地图或案例类中进行处理。

数据从json文件插入到delta表中。表中的列的类型为

String
。在此过程中,仅数据存储在增量表列中,而不是 json 数组中的键值。所以
select
查询会返回
partial
数据。这是增量表的默认行为吗?

例如

要存储的数据 { "name" : "Sample"} 实际存储的内容:{“Sample”}

背景

已经参考了以下SO问题并按照那里的说明进行操作,但找不到解决方案

问题1

问题2

  • 插入步骤

数据已从 json 文件插入到增量表中

其中一个字段是 json 数组,它应该按原样插入

    val jsonData = spark.read
      .option("multiLine", true)
      .option("mode",
        "PERMISSIVE"
      ).option(
        "dropFieldIfAllNull",
        false
      ).json(FILE_PATH)
      .createOrReplaceTempView("data")

    val data = spark.sql("""SELECT * from Data""")

    val inputDataFrame = data
      .select(
        col("Id"),
        col("Version"),
        col("StartDate"),
        col("EndDate"),
        explode(col("Configuration")).alias(
          "Config"
        ) 
      )
      .withColumn("ConfigId", col("Config.Id"))
      .
      .
      .
      .toDF
      
      val deltaTable = DeltaTable.forPath(.....)

      deltaTable
      .as("Config") 
      .merge(
             inputDataFrame.as("input"),
  
      )
      .whenMatched
      .update(
        Map(
            "ConfigData" -> col("input.Config"),
        )
      )
      .whenNotMatched
      .insert(
        Map(
          "ConfigData" -> col("input.Config"),
        )
      )
      .execute()
  }
  • 选择步骤

运行

select
查询后,数据显示没有键,仅插入了 json 字符串中的值

  • 阅读步骤

方法一

val arrayOffStringSchema = ArrayType(StringType)

var configData = df.select("Config").as[String]
configData.show(false) // shows Config column and data without keys
                                         
var desiredData = configData.withColumn("Config",from_json(col("Config"), arrayofStringSchema))
desiredData.show(false) // shows null

方法2

val json_schema = spark.read
      .option("multiLine", true)
      .option(
        "mode",
        "PERMISSIVE"
      ) 
      .option(
        "dropFieldIfAllNull",
        false
      ).json(df.select("Config").as[String]).schema
var config = configData.withColumn("Config", from_json(col("Config"),json_schema))
config.show(false) // shows Config column and data without keys
config.printSchema() // Shows following

/*
root
 |-- Config: struct (nullable = true)
 |    |-- _corrupt_record: string (nullable = true)
*/

平台

Scala 2.12.18

阿帕奇火花3.5.1

这里缺少什么?插入有问题还是有其他方法可以实现此目的?

编辑1 显然,当存储数据时,会自动推断模式,并且不会将其存储在增量表中。因此,在获取数据时,不知道模式来自哪里。基本上需要一个 Map,它将能够从存储为增量表中列的 json 字符串中获取字段。

json scala apache-spark delta-lake
1个回答
0
投票

让我简单地解释一下这个问题

正在读取 json 文件中的数据并将其插入到 Delta Table 列中。

该列的类型为 String

据观察,当包含 json 数据的数据帧插入 Delta 表列时,它会自动推断架构,并且 json 结构中的所有键都会被删除,只存储值

解析数据变得困难,因为数据是稍后从增量表列中读取的。

解决方案

to_json:用于将数据帧列转换为json并存储到增量表列中

from_json:用于将数据帧列转换为结构化json

val json_schema = spark.read
      .option("multiLine", true)
      .option(
        "mode",
        "PERMISSIVE"
      ) 
      .option(
        "dropFieldIfAllNull",
        false
      ).json(df.select("Config").as[String]).schema

var struct_json = levers.withColumn("StructuredConfig", from_json(col("Config"),json_schema))

struct_json.printSchema // shows desired schema

现在可以从数据框中选择

StructuredConfig.name
StructuredConfig.id
,这在以前是不可能的

© www.soinside.com 2019 - 2024. All rights reserved.