我正在尝试将SPEL表达式与Spark udf(scala)一起使用。 SPEL表达式可以是简单的数学运算,也可以是自定义函数以及java.lang.Math类。当我在本地运行它时,一切正常,但是当我构建一个胖子并将其部署到数据块上时,出现以下异常
SparkException: Job aborted due to stage failure: Task 0 in stage 159.0 failed 4 times, most recent failure: Lost task 0.3 in stage 159.0 (TID 1290, 172.16.9.7, executor 2): java.lang.NoSuchMethodError: org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()Lorg/springframework/core/convert/ConversionService;
当SPEL包含一个自定义函数或java.lang.Math类时,我得到了这个异常,但是它可以正常工作,然后有简单的数学运算,例如加法,子运算,mul等。
[在线搜索表明,这是由于spring jar中的版本不兼容所致,但是1. im仅使用spring-expression和2.我看不到任何版本不兼容。
这是我的UDF:
def calculateSpelExpression: (Seq[Double], String, mutable.WrappedArray[String]) => Double = (values: Seq[Double], expression: String, variableNames: mutable.WrappedArray[String]) => {
val context = new StandardEvaluationContext()
val parser = new SpelExpressionParser()
variableNames.zipWithIndex.foreach { iter =>
val name = iter._1.tail // use .tail in order to remove the '# ' symbol
val value = values(iter._2)
context.setVariable(name, value)
}
val value = parser.parseExpression(expression).getValue(context)
if (value.isInstanceOf[Int]) value.asInstanceOf[Int].toDouble
else value.asInstanceOf[Double]
}
和我的pom.xml
:
<dependencies>
<dependency>
<groupId>com.typesafe</groupId>
<artifactId>config</artifactId>
<version>1.4.0</version>
</dependency>
<dependency>
<groupId>org.springframework</groupId>
<artifactId>spring-expression</artifactId>
<version>5.2.0.RELEASE</version>
</dependency>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.scalatest</groupId>
<artifactId>scalatest_${scala.compat.version}</artifactId>
<version>3.0.8</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-math3</artifactId>
<version>3.2</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-mllib_2.11</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>io.delta</groupId>
<artifactId>delta-core_2.11</artifactId>
<version>0.4.0</version>
<!-- <scope>provided</scope>-->
</dependency>
<dependency>
<groupId>org.postgresql</groupId>
<artifactId>postgresql</artifactId>
<version>42.2.10</version>
</dependency>
<dependency>
<groupId>com.datastax.spark</groupId>
<artifactId>spark-cassandra-connector_2.11</artifactId>
<version>${spark.version}</version>
<exclusions>
<exclusion>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
</exclusion>
</exclusions>
</dependency>
</dependencies>
似乎您以某种方式在类路径上具有旧版本的spring-core
jar。
NoSuchMethodError:org.springframework.core.convert.support.DefaultConversionService.getSharedInstance()
该方法已在4.3.5中添加。
/**
* Return a shared default {@code ConversionService} instance,
* lazily building it once needed.
* <p><b>NOTE:</b> We highly recommend constructing individual
* {@code ConversionService} instances for customization purposes.
* This accessor is only meant as a fallback for code paths which
* need simple type coercion but cannot access a longer-lived
* {@code ConversionService} instance any other way.
* @return the shared {@code ConversionService} instance (never {@code null})
* @since 4.3.5
*/
public static ConversionService getSharedInstance() {