如何使用Java UDF将新列添加到Spark数据帧

问题描述 投票:1回答:1

我有一个Dataset<Row> inputDS有4列,即Id, List<long> time, List<String> value, aggregateType我想使用map函数向Dataset value_new添加一个列,该map函数采用列timevalueaggregateType将其传递给函数getAggregate(String aggregateType, List<long> time, List<String> value)并返回处理参数的double值。方法Double返回的getAggregate值将是新的列值,即value_new的值

数据集输入DS

 +------+---+-----------+---------------------------------------------+---------------+
 |    Id| value         |     time                                   |aggregateType  |
 +------+---------------+---------------------------------------------+---------------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |
 +------+---------------+---------------------------------------------+---------------+

预期的数据集输出DS

 +------+---------------+---------------------------------------------+---------------+-----------+
 |    Id| value         |     time                                    |aggregateType  | value_new |
 +------+---------------+---------------------------------------------+---------------+-----------+
 |0001  |  [1.5,3.4,4.5]| [1551502200000,1551502200000,1551502200000] | Sum           |   9.4     |
 +------+---------------+---------------------------------------------+---------------+-----------+

我试过的代码。

 inputDS.withColumn("value_new",functions.lit(inputDS.map(new MapFunction<Row,Double>(){

 public double call(Row row){
 String aggregateType = row.getAS("aggregateType");
 List<long> timeList = row.getList("time");
 List<long> valueList= row.getList("value");  

 return  getAggregate(aggregateType ,timeList,valueList);    

 }}),Encoders.DOUBLE())));

错误

 Unsupported literal type class org.apache.spark.sql.Dataset [value:double]

注意抱歉,如果我错误地使用了map功能,如果有任何解决方法,请建议我。

谢谢。!

apache-spark apache-spark-sql
1个回答
0
投票

你得到错误,因为你试图使用lit()的结果创建一个函数文字(Dataset.map()),你可以在docs中看到它是一个数据集。你可以在API中看到Dataset.withColumn()你需要一个列的参数。

您似乎需要创建用户定义的函数。看看How do I call a UDF on a Spark DataFrame using JAVA?

© www.soinside.com 2019 - 2024. All rights reserved.