Spark SPEL表达式UDF

问题描述 投票:0回答:1

我正在尝试将SPEL表达式与Spark udf(scala)一起使用。 SPEL表达式可以是简单的数学运算,也可以是自定义函数以及java.lang.Math类。当我在本地运行它时,一切正常,但是当我构建一个胖子并将其部署到数据块上时,出现以下异常

SparkException: Job aborted due to stage failure: Task 0 in stage 159.0 failed 4 times, most recent failure: Lost task 0.3 in stage 159.0 (TID 1290, 172.16.9.7, executor 2): java.lang.NoSuchMethodError: org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()Lorg/springframework/core/convert/ConversionService;

当SPEL包含一个自定义函数或java.lang.Math类时,我得到了这个异常,但是它可以正常工作,然后有简单的数学运算,例如加法,子运算,mul等。

[在线搜索表明,这是由于spring jar中的版本不兼容所致,但是1. im仅使用spring-expression和2.我看不到任何版本不兼容。

这是我的UDF:

 def calculateSpelExpression: (Seq[Double], String, mutable.WrappedArray[String]) => Double = (values: Seq[Double], expression: String, variableNames: mutable.WrappedArray[String]) => {
    val context = new StandardEvaluationContext()
    val parser = new SpelExpressionParser()

    variableNames.zipWithIndex.foreach { iter =>
      val name = iter._1.tail // use .tail in order to remove the '# ' symbol
      val value = values(iter._2)
      context.setVariable(name, value)
    }
    val value = parser.parseExpression(expression).getValue(context)
    if (value.isInstanceOf[Int]) value.asInstanceOf[Int].toDouble
    else value.asInstanceOf[Double]
  }

和我的pom.xml


    <dependencies>
        <dependency>
            <groupId>com.typesafe</groupId>
            <artifactId>config</artifactId>
            <version>1.4.0</version>
        </dependency>

        <dependency>
            <groupId>org.springframework</groupId>
            <artifactId>spring-expression</artifactId>
            <version>5.2.0.RELEASE</version>
        </dependency>

        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
            <version>${scala.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.scalatest</groupId>
            <artifactId>scalatest_${scala.compat.version}</artifactId>
            <version>3.0.8</version>
            <scope>test</scope>
        </dependency>

 <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-math3</artifactId>
            <version>3.2</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>


        <dependency>
            <groupId>io.delta</groupId>
            <artifactId>delta-core_2.11</artifactId>
            <version>0.4.0</version>
<!--            <scope>provided</scope>-->
        </dependency>

        <dependency>
            <groupId>org.postgresql</groupId>
            <artifactId>postgresql</artifactId>
            <version>42.2.10</version>
        </dependency>

        <dependency>
            <groupId>com.datastax.spark</groupId>
            <artifactId>spark-cassandra-connector_2.11</artifactId>
            <version>${spark.version}</version>
            <exclusions>
                <exclusion>
                    <groupId>io.netty</groupId>
                    <artifactId>netty-all</artifactId>
                </exclusion>
            </exclusions>
        </dependency>
    </dependencies>
apache-spark apache-spark-sql databricks azure-databricks spring-el
1个回答
0
投票

似乎您以某种方式在类路径上具有旧版本的spring-core jar。

NoSuchMethodError:org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()

该方法已在4.3.5中添加。

/**
 * Return a shared default {@code ConversionService} instance,
 * lazily building it once needed.
 * <p><b>NOTE:</b> We highly recommend constructing individual
 * {@code ConversionService} instances for customization purposes.
 * This accessor is only meant as a fallback for code paths which
 * need simple type coercion but cannot access a longer-lived
 * {@code ConversionService} instance any other way.
 * @return the shared {@code ConversionService} instance (never {@code null})
 * @since 4.3.5
 */
public static ConversionService getSharedInstance() {
© www.soinside.com 2019 - 2024. All rights reserved.