更新spark / scala中的数据框

问题描述 投票:0回答:2

我需要根据不属于数据框中任何其他列的字符串更新dataframe列的值。我该怎么做呢?

对于例如假设我的数据框有列A,B,C。我想根据A列中的值和静态字符串的组合更新C列的值。我试着做以下事情。

val df = originalDF.withColumn("C", Helper.dudf(df("A"), lit("str")))

我的助手课如下

val addDummyColumn :(String, String)=>String=(input:String, recordType: String)=>{input}

val dummyUDF = udf(addDummyColumn)

我的UDF接受变量A和recordType:

if(recordType.equals("TRANSACTION") {
 if(A > 0 ) return "CHARGE";
   else return "REFUND"
} else if (recordType.equals("CHARGEBACK") {
    return "CHARGEBACK"
}

示例输入和输出:

Sample Input:
A=10, recordType=TRANSACTION
Output: C = CHARGE
A=-10, recordType=TRANSACTION
C = REFUND

A=10, recordType=CHARGEBACK
C = CHARGEBACK

我的问题是withColumn只接受Column所以我点亮了(“str”),但我不知道如何在我的UDF中提取该列的值。想法?

scala apache-spark
2个回答
1
投票

如果列A是IntegerType,那么您可以将udf函数定义为

val recordType: String = //"TRANSACTION" or "CHARGEBACK"
import org.apache.spark.sql.functions._
val dummyUDF = udf((A: Int, recordType: String) => {
  if(recordType.equals("TRANSACTION")){
    if(A > 0) "CHARGE" else "REFUND"
  } else if (recordType.equals("CHARGEBACK"))
    "CHARGEBACK"
  else
    "not known"
})

val df = originalDF.withColumn("C", dummyUDF(originalDF("A"), lit(recordType)))

1
投票

这是你如何使用udf并传递列和静态字符串

 val addDummy = udf((A : String, recordType: String) => {
if(recordType.equals("TRANSACTION")) {
  if(A.toInt > 0 ) 
    "CHARGE" 
  else
    "REFUND"
}else if (recordType.equals("CHARGEBACK")) {
  "CHARGEBACK"
}else
  "NONE"
  })

现在打电话给udf如下

val newDF = df.withColumn("newCol", addDummy($"A", lit("TRANSACTION")))

希望这可以帮助!

© www.soinside.com 2019 - 2024. All rights reserved.