我有一个UDF:
val TrimText = (s: AnyRef) => {
//does logic returns string
}
和一个数据帧:
var df = spark.read.option("sep", ",").option("header", "true").csv(root_path + "/" + file)
我想对数据帧中每一列的每个值执行TrimText
。
但问题是,我有一个动态的列数。我知道我可以通过df.columns
获得列的列表。但我不确定这将如何帮助我解决我的问题。我怎么解决这个问题?
TLDR问题 - 当数据帧具有未知列数时,在数据帧中的每个列上执行UDF
试图使用:
df.columns.foldLeft( df )( (accDF, c) =>
accDF.withColumn(c, TrimText(col(c)))
)
抛出此错误:
error: type mismatch;
found : String
required: org.apache.spark.sql.Column
accDF.withColumn(c, TrimText(col(c)))
TrimText
假设返回一个字符串,并期望输入是列中的值。因此,它将标准化整个数据帧的每一行中的每个值。
您可以使用foldLeft
遍历列列表,以使用您的UDF迭代地将withColumn
应用于DataFrame:
df.columns.foldLeft( df )( (accDF, c) =>
accDF.withColumn(c, TrimText(col(c)))
)
>> I would like to perform TrimText on every value in every column in the dataframe.
>> I have a dynamic number of columns.
当sql函数可用于修剪为什么UDF时,可以看到下面的代码适合你吗?
import org.apache.spark.sql.functions._
spark.udf.register("TrimText", (x:String) => ..... )
val df2 = sc.parallelize(List(
(26, true, 60000.00),
(32, false, 35000.00)
)).toDF("age", "education", "income")
val cols2 = df2.columns.toSet
df2.createOrReplaceTempView("table1")
val query = "select " + buildcolumnlst(cols2) + " from table1 "
println(query)
val dfresult = spark.sql(query)
dfresult.show()
def buildcolumnlst(myCols: Set[String]) = {
myCols.map(x => "TrimText(" + x + ")" + " as " + x).mkString(",")
}
结果,
select trim(age) as age,trim(education) as education,trim(income) as income from table1
+---+---------+-------+
|age|education| income|
+---+---------+-------+
| 26| true|60000.0|
| 32| false|35000.0|
+---+---------+-------+
val a = sc.parallelize(Seq(("1 "," 2"),(" 3","4"))).toDF()
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions._
def TrimText(s: Column): Column = {
//does logic returns string
trim(s)
}
a.select(a.columns.map(c => TrimText(col(c))):_*).show