使用 Apache Spark 连接 Presto 时如何解决 SQL 异常 - Unsupported type JAVA_OBJECT?

问题描述 投票:0回答:1

我对 Apache Spark 非常陌生,正在尝试从 Apache Spark 连接到 Presto。下面是我的连接字符串,它给出了错误。

val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:presto://host:port/hive?user=username&SSL=true&SSLTrustStorePath=/path/certificatefile", "driver" -> "com.facebook.presto.jdbc.PrestoDriver", "dbtable" -> "tablename", "fetchSize" ->  "10000", "partitionColumn" -> "columnname", "lowerBound" -> "1988", "upperBound" -> "2016", "numPartitions" -> "28")).load()

我首先在spark/sbin 中启动start-master.sh。我还尝试在 Spark-Shell 中设置 jar 和驱动程序类路径,如下所示:

./spark-shell  --driver-class-path com.facebook.presto.jdbc.PrestoDriver --jars /path/jar/file

仍然出现以下错误:

java.sql.SQLException: Unsupported type JAVA_OBJECT
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:251)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:316)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:316)
  at scala.Option.getOrElse(Option.scala:121)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:315)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
  at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)
  at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
  at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
  at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
  at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)

有人可以帮我吗?谢谢

apache-spark jdbc presto spark-shell
1个回答
0
投票

Spark 代码中有几行 JDBCUtils.scala

case _ =>
  // For unmatched types:
  // including java.sql.Types.ARRAY,DATALINK,DISTINCT,JAVA_OBJECT,NULL,OTHER,REF_CURSOR,
  // TIME_WITH_TIMEZONE,TIMESTAMP_WITH_TIMEZONE, and among others.
  val jdbcType = classOf[JDBCType].getEnumConstants()
    .find(_.getVendorTypeNumber == sqlType)
    .map(_.getName)
    .getOrElse(sqlType.toString)
  throw QueryExecutionErrors.unrecognizedSqlTypeError(jdbcType, typeName)

这就是为什么你会收到这样的异常。 但是,有一个

JdbcDialect
概念,允许您为每个 JDBC 驱动程序创建覆盖。 简而言之,这段代码应该可以帮助您将其读为
StringType

import io.trino.jdbc.$internal.client.ClientStandardTypes
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType}

import java.sql.Types
import java.util.Locale

class TrinoJdbcDialect extends JdbcDialect {
  override def canHandle(url: String): Boolean = {
    url.toLowerCase(Locale.ROOT).startsWith("jdbc:trino")
  }

  override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
    // values are 2000 / "json" as per the moment of writing        
    if (sqlType == Types.JAVA_OBJECT && typeName == ClientStandardTypes.JSON) {
      Some(StringType)
    } else {
      super.getCatalystType(sqlType, typeName, size, md)
    }
  }
}

然后在应用程序启动时,需要添加以下行:

 JdbcDialects.registerDialect(new TrinoJdbcDialect)

已验证其适用于 Spark 3.3 和 Trino 420。

© www.soinside.com 2019 - 2024. All rights reserved.