Spark:替换嵌套列中的 Null 值

问题描述 投票:0回答:4

我想将以下数据框中的所有

n/a
值替换为
unknown
。 它可以是
scalar
complex nested column
。 如果它是
StructField column
,我可以循环遍历列并使用
n\a
替换
WithColumn
。 但我希望在
generic way
中完成此操作,尽管该列的
type
因为我不想明确指定列名,因为我的例子中有 100 个列名?

case class Bar(x: Int, y: String, z: String)
case class Foo(id: Int, name: String, status: String, bar: Seq[Bar])

val df = spark.sparkContext.parallelize(
Seq(
  Foo(123, "Amy", "Active", Seq(Bar(1, "first", "n/a"))),
  Foo(234, "Rick", "n/a", Seq(Bar(2, "second", "fifth"),Bar(22, "second", "n/a"))),
  Foo(567, "Tom", "null", Seq(Bar(3, "second", "sixth")))
)).toDF

df.printSchema
df.show(20, false)

结果:

+---+----+------+---------------------------------------+
|id |name|status|bar                                    |
+---+----+------+---------------------------------------+
|123|Amy |Active|[[1, first, n/a]]                      |
|234|Rick|n/a   |[[2, second, fifth], [22, second, n/a]]|
|567|Tom |null  |[[3, second, sixth]]                   |
+---+----+------+---------------------------------------+   

预期输出:

+---+----+----------+---------------------------------------------------+
|id |name|status    |bar                                                |
+---+----+----------+---------------------------------------------------+
|123|Amy |Active    |[[1, first, unknown]]                              |
|234|Rick|unknown   |[[2, second, fifth], [22, second, unknown]]        |
|567|Tom |null      |[[3, second, sixth]]                               |
+---+----+----------+---------------------------------------------------+

对此有什么建议吗?

scala apache-spark apache-spark-sql
4个回答
5
投票

如果您喜欢使用 RDD,这里有一个简单、通用且进化的解决方案:

  val naToUnknown = {r: Row =>
    def rec(r: Any): Any = {
      r match {
        case row: Row => Row.fromSeq(row.toSeq.map(rec))
        case seq: Seq[Any] => seq.map(rec)
        case s: String if s == "n/a" => "unknown"
        case _ => r
      }
    }
    Row.fromSeq(r.toSeq.map(rec))
  }

  val newDF = spark.createDataFrame(df.rdd.map{naToUnknown}, df.schema)
  newDF.show(false)

输出:

+---+----+-------+-------------------------------------------+
|id |name|status |bar                                        |
+---+----+-------+-------------------------------------------+
|123|Amy |Active |[[1, first, unknown]]                      |
|234|Rick|unknown|[[2, second, fifth], [22, second, unknown]]|
|567|Tom |null   |[[3, second, sixth]]                       |
+---+----+-------+-------------------------------------------+

1
投票

当您只有简单的列和结构时,替换嵌套值在某种程度上很容易。 对于数组字段,您必须在替换或使用 UDF/高阶函数之前分解结构,请参阅我的其他答案here

您可以定义一个循环遍历 DataFrame 模式的通用函数 并应用 lambda 函数

func
来替换你想要的:

def replaceNestedValues(schema: StructType, func: Column => Column, path: Option[String] = None): Seq[Column] = {
  schema.fields.map(f => {
    val p = path.fold(s"`${f.name}`")(c => s"$c.`${f.name}`")
    f.dataType match {
      case s: StructType => struct(replaceNestedValues(s, func, Some(p)): _*).alias(f.name)
      case _ => func(col(p)).alias(f.name)
    }
  })
}

在使用此函数之前,请像这样分解数组结构

bar

val df2 = df.select($"id", $"name", $"status", explode($"bar").alias("bar"))

然后,定义一个 lambda 函数,该函数接受一个列,并在它等于

unknown
时使用
n/a
函数将其替换为
when/otherwise
,并使用上述函数对列应用转换:

val replaceNaFunc: Column => Column = c => when(c === lit("n/a"), lit("unknown")).otherwise(c)
val replacedCols = replaceNestedValues(df2.schema, replaceNaFunc)

选择新列和 groupBy 以获取

bar
数组:

df2.select(replacedCols: _*).groupBy($"id", $"name", $"status").agg(collect_list($"bar").alias("bar")).show(false)

给予:

+---+----+-------+-------------------------------------------+                  
|id |name|status |bar                                        |
+---+----+-------+-------------------------------------------+
|234|Rick|unknown|[[2, second, fifth], [22, second, unknown]]|
|123|Amy |Active |[[1, first, unknown]]                      |
|567|Tom |null   |[[3, second, sixth]]                       |
+---+----+-------+-------------------------------------------+

0
投票

您可以定义一个 UDF 来处理您的数组并替换您想要的项目:

UDF

 val replaceNA =  udf((x:Row) => {
      val z = x.getString(2)
      if ( z == "n/a")
        Bar(x.getInt(0), x.getString(1), "unknow")
      else
        Bar(x.getInt(0), x.getString(1), x.getString(2))
      })

一旦有了该 UDF,您就可以 explode 您的数据框,将 bar 中的每个项目作为单行:

 val explodedDF = df.withColumn("exploded", explode($"bar"))
+---+----+------+--------------------+------------------+
| id|name|status|                 bar|          exploded|
+---+----+------+--------------------+------------------+
|123| Amy|Active|   [[1, first, n/a]]|   [1, first, n/a]|
|234|Rick|   n/a|[[2, second, fift...|[2, second, fifth]|
|234|Rick|   n/a|[[2, second, fift...| [22, second, n/a]|
|567| Tom|  null|[[3, second, sixth]]|[3, second, sixth]|
+---+----+------+--------------------+------------------+ 

然后应用之前定义的UDF来替换项目:

val replacedDF = explodedDF.withColumn("exploded", replaceNA($"exploded"))
+---+----+------+--------------------+--------------------+
| id|name|status|                 bar|            exploded|
+---+----+------+--------------------+--------------------+
|123| Amy|Active|   [[1, first, n/a]]|  [1, first, unknow]|
|234|Rick|   n/a|[[2, second, fift...|  [2, second, fifth]|
|234|Rick|   n/a|[[2, second, fift...|[22, second, unknow]|
|567| Tom|  null|[[3, second, sixth]]|  [3, second, sixth]|
+---+----+------+--------------------+--------------------+

最后分组collect_list一起将其返回到原始状态

 val resultDF = replacedDF.groupBy("id", "name", "status")
      .agg(collect_list("exploded").as("bar")).show(false)
+---+----+------+------------------------------------------+
|id |name|status|bar                                       |
+---+----+------+------------------------------------------+
|234|Rick|n/a   |[[2, second, fifth], [22, second, unknow]]|
|567|Tom |null  |[[3, second, sixth]]                      |
|123|Amy |Active|[[1, first, unknow]]                      |
+---+----+------+------------------------------------------+

一步将所有内容组合在一起:

import org.apache.spark.sql._

 val replaceNA =  udf((x:Row) => {
          val z = x.getString(2)
          if ( z == "n/a")
            Bar(x.getInt(0), x.getString(1), "unknow")
          else
            Bar(x.getInt(0), x.getString(1), x.getString(2))
          }) 

df.withColumn("exploded", explode($"bar"))
 .withColumn("exploded", replaceNA($"exploded"))
 .groupBy("id", "name", "status")
 .agg(collect_list("exploded").as("bar"))

0
投票

@blackbishop 这很好用,但我遇到的一个问题是这会向 StructType 的空列添加一个“空”对象。 这是一个例子:

case class Demographics(city: String)
case class Detail(age: Int, demographics: Demographics)
case class Person(name: String, details: Details

val data = Seq(Data(Person("James", Details(48, demographics=Demographics("Toronto")))), Data(Person("Mary", Details(41, demographics=Demographics(null)))), Data(null)).toDS
data.show()

+--------------------+
|              person|
+--------------------+
|{James, {48, {Tor...|
|{Mary, {41, {NULL}}}|
|                NULL|
|        {Jane, NULL}|
+--------------------+
data.select(replacedCols: _*).show(false)
+----------------------------+
|person                      |
+----------------------------+
|{James, {48, {Toronto}}}    |
|{Mary, {41, {unknown}}}     |
|{unknown, {NULL, {unknown}}}|
|{Jane, {NULL, {unknown}}}   |
+----------------------------+

知道如何防止这种情况吗?

© www.soinside.com 2019 - 2024. All rights reserved.