在Scala Spark Dataframe中扁平化嵌套的json。

问题描述 投票:0回答:1

我有多个来自任何restapi's的jsons,但我不知道它的模式,我无法使用dataframes的explode功能,因为我不知道列名,这是由spark api创建的。我无法使用dataframes的explode函数,因为我不知道列名,这是由spark api创建。

1.我们是否可以通过解码的方式来存储嵌套数组元素的键值?dataframe.schema.fields由于spark只提供数据框架的行中的值部分,并取顶层键作为列名。

数据框架---------------------。

+--------------------+
|       stackoverflow|
+--------------------+
|[[[Martin Odersky...|
+--------------------+

有没有什么最佳的方法,通过在运行时确定模式,使用数据框架方法来扁平化json。

示例Json-。

{
  "stackoverflow": [{
    "tag": {
      "id": 1,
      "name": "scala",
      "author": "Martin Odersky",
      "frameworks": [
        {
          "id": 1,
          "name": "Play Framework"
        },
        {
          "id": 2,
          "name": "Akka Framework"
        }
      ]
    }
  },
    {
      "tag": {
        "id": 2,
        "name": "java",
        "author": "James Gosling",
        "frameworks": [
          {
            "id": 1,
            "name": "Apache Tomcat"
          },
          {
            "id": 2,
            "name": "Spring Boot"
          }
        ]
      }
    }
  ]
}

注意--我们需要在数据框架中进行所有的操作,因为有大量的数据,我们无法解析每一个json。

scala apache-spark multidimensional-array apache-spark-sql
1个回答
1
投票

创建了帮助函数& 你可以直接调用。df.explodeColumns 在DataFrame上。

下面的代码将扁平化多级数组& 结构类型的列。

scala> :paste
// Entering paste mode (ctrl-D to finish)

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try

implicit class DFHelpers(df: DataFrame) {
    def columns = {
      val dfColumns = df.columns.map(_.toLowerCase)
      df.schema.fields.flatMap { data =>
        data match {
          case column if column.dataType.isInstanceOf[StructType] => {
            column.dataType.asInstanceOf[StructType].fields.map { field =>
              val columnName = column.name
              val fieldName = field.name
              col(s"${columnName}.${fieldName}").as(s"${columnName}_${fieldName}")
            }.toList
          }
          case column => List(col(s"${column.name}"))
        }
      }
    }

    def flatten: DataFrame = {
      val empty = df.schema.filter(_.dataType.isInstanceOf[StructType]).isEmpty
      empty match {
        case false =>
          df.select(columns: _*).flatten
        case _ => df
      }
    }
    def explodeColumns = {
      @tailrec
      def columns(cdf: DataFrame):DataFrame = cdf.schema.fields.filter(_.dataType.typeName == "array") match {
        case c if !c.isEmpty => columns(c.foldLeft(cdf)((dfa,field) => {
          dfa.withColumn(field.name,explode_outer(col(s"${field.name}"))).flatten
        }))
        case _ => cdf
      }
      columns(df.flatten)
    }
}

// Exiting paste mode, now interpreting.

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import scala.annotation.tailrec
import scala.util.Try
defined class DFHelpers

扁平化的列

scala> df.printSchema
root
 |-- stackoverflow: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- tag: struct (nullable = true)
 |    |    |    |-- author: string (nullable = true)
 |    |    |    |-- frameworks: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: long (nullable = true)
 |    |    |    |    |    |-- name: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- name: string (nullable = true)


scala> df.explodeColumns.printSchema
root
 |-- author: string (nullable = true)
 |-- frameworks_id: long (nullable = true)
 |-- frameworks_name: string (nullable = true)
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

scala>

© www.soinside.com 2019 - 2024. All rights reserved.