动态选择col来获取值

问题描述 投票:0回答:1

我的数据框具有以下架构

 ID, Name, AcctIdName, AcctIdLoc, AcctIdPop, AcctIdTop, AcctIdPin
1, IdName,   1,          0,         0,         0,          0
2, IdLoc,    0,         -1,         0,         0,          0
3, IdPop,    0,          0,         0,         3,          0
4, IdTop,    0,          0,         0,         2,          0
5, IdPin,    0,          0,         0,         0,          7
6, IdTrin,   0,          0,         0,         0,          8

我想创建一个映射器,其中 AcctId 将具有映射器中提到的相应列的值

对于前任

when Name is IdName, AcctId should have value from AcctIdName
when Name is IdLoc, AcctId should have value from AcctIdLoc
when Name is IdPop, AcctId should have value from AcctIdPop
when Name is IdTop, AcctId should have value from AcctIdTop
when Name is IdPin, AcctId should have value from AcctIdPin
else not match found for any Name's value in Mapper, then assign default value from AcctIdName

根据上面的映射器,输出将是

ID, Name, AcctIdName, AcctIdLoc, AcctIdPop, AcctIdTop, AcctIdPin, AcctId
    1, IdName,   1,          0,         0,         0,          0,        1
    2, IdLoc,    0,         -1,         0,         0,          0,       -1
    3, IdPop,    0,          0,         0,         3,          0,        0
    4, IdTop,    0,          0,         0,         2,          0,        2
    5, IdPin,    0,          0,         0,         0,          7,        7
    6, IdTrin,   0,          0,         0,         0,          7,        0

我不想创建具有许多条件的代码,但想创建映射器/实用程序,它将通过从映射器/实用程序获取信息来为

AcctId
分配值。只想维护映射器,如果此模式中有更多列,我可以更新映射器/实用程序以适应我需要的更改。

有人可以建议什么吗?

谢谢

scala apache-spark
1个回答
0
投票

您可以使用 CASE 子句 和一系列 WHEN-THEN-ELSE 表达式来解决这个问题。请参阅下面的示例实现。这是一个灵活的解决方案,但缺点是 WHEN-THEN-ELSE 表达式链本质上执行线性搜索,因此它不能很好地随值/列的数量进行缩放。

另一种方法可以是广播地图并通过在恒定时间内进行地图查找来在 mapPartitions 函数中添加新列。不过,为后一种解决方案序列化行也会对性能产生影响。

import org.apache.spark.sql.{Column, DataFrame}
import org.apache.spark.sql.functions.col

def withMappedColumn(
    df: DataFrame,
    targetName: String,
    value: Column,
    sourceByValue: Map[String, Column],
    defaultSource: Column,
): DataFrame = {
  var target: Column = defaultSource
  sourceByValue.foreach(p => {
    val next = when(value === p._1, p._2)
    target = next.otherwise(target)
  })
  df.withColumn(targetName, target)
}

val df = Seq(
  (1, "IdName", 1, 0, 0, 0, 0),
  (2, "IdLoc", 0, -1, 0, 0, 0),
  (3, "IdPop", 0, 0, 0, 3, 0),
  (4, "IdTop", 0, 0, 0, 2, 0),
  (5, "IdPin", 0, 0, 0, 0, 7),
  (6, "IdTrin", 0, 0, 0, 0, 8),
).toDF("ID", "Name", "AcctIdName", "AcctIdLoc", "AcctIdPop", "AcctIdTop", "AcctIdPin")

val sourceByValue = Seq("Name", "Loc", "Pop", "Top", "Pin")
  .map((value: String) => ("Id" + value, col("AcctId" + value)))
  .toMap

val dfWithAcctId = withMappedColumn(df, "AcctId", col("Name"), sourceByValue, col("AcctIdName"))

dfWithAcctId.show
+---+------+----------+---------+---------+---------+---------+------+
| ID|  Name|AcctIdName|AcctIdLoc|AcctIdPop|AcctIdTop|AcctIdPin|AcctId|
+---+------+----------+---------+---------+---------+---------+------+
|  1|IdName|         1|        0|        0|        0|        0|     1|
|  2| IdLoc|         0|       -1|        0|        0|        0|    -1|
|  3| IdPop|         0|        0|        0|        3|        0|     0|
|  4| IdTop|         0|        0|        0|        2|        0|     2|
|  5| IdPin|         0|        0|        0|        0|        7|     7|
|  6|IdTrin|         0|        0|        0|        0|        8|     0|
+---+------+----------+---------+---------+---------+---------+------+

dfWithAcctId.explain(true)
== Parsed Logical Plan ==
'Project [ID#185, Name#186, ... , CASE WHEN ('Name = IdPin) THEN 'AcctIdPin ELSE CASE WHEN ('Name = IdName) THEN 'AcctIdName ELSE CASE WHEN ('Name = IdTop) THEN 'AcctIdTop ELSE CASE WHEN ('Name = IdPop) THEN 'AcctIdPop ELSE CASE WHEN ('Name = IdLoc) THEN 'AcctIdLoc ELSE 'AcctIdName END END END END END AS AcctId#199]
...
© www.soinside.com 2019 - 2024. All rights reserved.