如何从一个UDF中访问广播变量,并且广播变量是在另一个类中定义的,而这个UDF是被调用的。.
/* this udf is in different file */
package com.abc
public class JavaClass{
public static UserDefinedFunction getvalue = udf((String param) -> {
return "String value";
}, DataTypes.StringType);
}
**/* below code is in different file */**
package com.xyz;
import com.abc.JavaClass;
public class AnotherClassToCallUDF{
pubic static void main(String args[]) {
Dataset<Row> abc = .......;
abc.withColoumn("new-col",JavaClass.getvalue.apply("passing some value"));
}
}
**/* in the above code , how to pass broadcast variable while calling udf...since udf accepts only col typ`enter code here`e and lit type ..it does not accept anything else..then how to access broadcast variable which is defined in main class and accessing in another class..
*/**
试试这个
class MyUDF implements UDF1<Long, String> {
private Map<Long, String> broadCastMap;
public MyUDF(Broadcast<Map<Long, String>> broadCastMap) {
this.broadCastMap = broadCastMap.value();
}
public String call(Long id) {
return id +" -> " + broadCastMap.getOrDefault(id, "No mapping");
}
}
Dataset<Row> inputDf = spark.range(1, 5).withColumn("col1", lit("a"));
inputDf.show(false);
inputDf.printSchema();
/**
* +---+----+
* |id |col1|
* +---+----+
* |1 |a |
* |2 |a |
* |3 |a |
* |4 |a |
* +---+----+
*
* root
* |-- id: long (nullable = false)
* |-- col1: string (nullable = false)
*/
// Create broadcast
Map<Long, String> map = new HashMap<>();
map.put(1L, "b");
map.put(2L, "c");
Broadcast<Map<Long, String>> broadCastMap = new JavaSparkContext(spark.sparkContext()).broadcast(map);
UserDefinedFunction myUdf = udf(new MyUDF(broadCastMap), DataTypes.StringType);
spark.sqlContext().udf().register("myUdf", myUdf);
inputDf.withColumn("new_col", callUDF("myUdf",
JavaConverters.asScalaBufferConverter(Collections.singletonList(col("id"))).asScala()))
.show();
/**
* +---+----+---------------+
* | id|col1| new_col|
* +---+----+---------------+
* | 1| a| 1 -> b|
* | 2| a| 2 -> c|
* | 3| a|3 -> No mapping|
* | 4| a|4 -> No mapping|
* +---+----+---------------+
*/
FunctionRegistry
inputDf.withColumn("new_col", myUdf.apply(col("id")))
.show();
/**
* +---+----+---------------+
* | id|col1| new_col|
* +---+----+---------------+
* | 1| a| 1 -> b|
* | 2| a| 2 -> c|
* | 3| a|3 -> No mapping|
* | 4| a|4 -> No mapping|
* +---+----+---------------+
*/