如何从一个udf中访问广播变量,并且广播变量是在另一个类中定义的,而这个udf是被调用的。

问题描述 投票:0回答:1

如何从一个UDF中访问广播变量,并且广播变量是在另一个类中定义的,而这个UDF是被调用的。.

    /* this udf is in different file */
    package com.abc
     public class JavaClass{
        public static UserDefinedFunction getvalue = udf((String param) -> {

        return "String value";
        }, DataTypes.StringType);

    }    
**/* below code is in different file */** 
    package com.xyz;
    import com.abc.JavaClass;
     public class AnotherClassToCallUDF{
        pubic static void main(String args[]) {
    Dataset<Row> abc = .......;

    abc.withColoumn("new-col",JavaClass.getvalue.apply("passing some value"));
    }

    }       

    **/* in the above code , how to pass broadcast variable while calling udf...since udf accepts only col typ`enter code here`e and lit type ..it does not accept anything else..then how to access broadcast variable which is defined in main class and accessing in another class..       
*/**
apache-spark apache-spark-sql spark-java
1个回答
0
投票

试试这个

1.创建UDF

class MyUDF implements UDF1<Long, String> {
        private Map<Long, String> broadCastMap;
        public MyUDF(Broadcast<Map<Long, String>> broadCastMap) {
           this.broadCastMap = broadCastMap.value();
        }
        public String call(Long id) {
            return id +" -> " + broadCastMap.getOrDefault(id, "No mapping");
        }
    }

2. 使用udf传递广播并使用它

Dataset<Row> inputDf = spark.range(1, 5).withColumn("col1", lit("a"));
        inputDf.show(false);
        inputDf.printSchema();
        /**
         * +---+----+
         * |id |col1|
         * +---+----+
         * |1  |a   |
         * |2  |a   |
         * |3  |a   |
         * |4  |a   |
         * +---+----+
         *
         * root
         *  |-- id: long (nullable = false)
         *  |-- col1: string (nullable = false)
         */

        // Create broadcast
        Map<Long, String> map = new HashMap<>();
        map.put(1L, "b");
        map.put(2L, "c");
        Broadcast<Map<Long, String>> broadCastMap = new JavaSparkContext(spark.sparkContext()).broadcast(map);

        UserDefinedFunction myUdf = udf(new MyUDF(broadCastMap), DataTypes.StringType);

        spark.sqlContext().udf().register("myUdf", myUdf);

        inputDf.withColumn("new_col", callUDF("myUdf",
                JavaConverters.asScalaBufferConverter(Collections.singletonList(col("id"))).asScala()))
                .show();
        /**
         * +---+----+---------------+
         * | id|col1|        new_col|
         * +---+----+---------------+
         * |  1|   a|         1 -> b|
         * |  2|   a|         2 -> c|
         * |  3|   a|3 -> No mapping|
         * |  4|   a|4 -> No mapping|
         * +---+----+---------------+
         */

3. 3. 如果不将UDF登记到 FunctionRegistry

 inputDf.withColumn("new_col", myUdf.apply(col("id")))
                .show();
        /**
         * +---+----+---------------+
         * | id|col1|        new_col|
         * +---+----+---------------+
         * |  1|   a|         1 -> b|
         * |  2|   a|         2 -> c|
         * |  3|   a|3 -> No mapping|
         * |  4|   a|4 -> No mapping|
         * +---+----+---------------+
         */
© www.soinside.com 2019 - 2024. All rights reserved.