我对 Apache Spark 非常陌生,正在尝试从 Apache Spark 连接到 Presto。下面是我的连接字符串,它给出了错误。
val jdbcDF = spark.read.format("jdbc").options(Map("url" -> "jdbc:presto://host:port/hive?user=username&SSL=true&SSLTrustStorePath=/path/certificatefile", "driver" -> "com.facebook.presto.jdbc.PrestoDriver", "dbtable" -> "tablename", "fetchSize" -> "10000", "partitionColumn" -> "columnname", "lowerBound" -> "1988", "upperBound" -> "2016", "numPartitions" -> "28")).load()
我首先在spark/sbin 中启动start-master.sh。我还尝试在 Spark-Shell 中设置 jar 和驱动程序类路径,如下所示:
./spark-shell --driver-class-path com.facebook.presto.jdbc.PrestoDriver --jars /path/jar/file
仍然出现以下错误:
java.sql.SQLException: Unsupported type JAVA_OBJECT
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.org$apache$spark$sql$execution$datasources$jdbc$JdbcUtils$$getCatalystType(JdbcUtils.scala:251)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:316)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$8.apply(JdbcUtils.scala:316)
at scala.Option.getOrElse(Option.scala:121)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$.getSchema(JdbcUtils.scala:315)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:63)
at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:210)
at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:35)
at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:318)
at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:223)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:211)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
有人可以帮我吗?谢谢
Spark 代码中有几行 JDBCUtils.scala
case _ =>
// For unmatched types:
// including java.sql.Types.ARRAY,DATALINK,DISTINCT,JAVA_OBJECT,NULL,OTHER,REF_CURSOR,
// TIME_WITH_TIMEZONE,TIMESTAMP_WITH_TIMEZONE, and among others.
val jdbcType = classOf[JDBCType].getEnumConstants()
.find(_.getVendorTypeNumber == sqlType)
.map(_.getName)
.getOrElse(sqlType.toString)
throw QueryExecutionErrors.unrecognizedSqlTypeError(jdbcType, typeName)
这就是为什么你会收到这样的异常。 但是,有一个
JdbcDialect
概念,允许您为每个 JDBC 驱动程序创建覆盖。
简而言之,这段代码应该可以帮助您将其读为 StringType
。
import io.trino.jdbc.$internal.client.ClientStandardTypes
import org.apache.spark.sql.jdbc.JdbcDialect
import org.apache.spark.sql.types.{DataType, MetadataBuilder, StringType}
import java.sql.Types
import java.util.Locale
class TrinoJdbcDialect extends JdbcDialect {
override def canHandle(url: String): Boolean = {
url.toLowerCase(Locale.ROOT).startsWith("jdbc:trino")
}
override def getCatalystType(sqlType: Int, typeName: String, size: Int, md: MetadataBuilder): Option[DataType] = {
// values are 2000 / "json" as per the moment of writing
if (sqlType == Types.JAVA_OBJECT && typeName == ClientStandardTypes.JSON) {
Some(StringType)
} else {
super.getCatalystType(sqlType, typeName, size, md)
}
}
}
然后在应用程序启动时,需要添加以下行:
JdbcDialects.registerDialect(new TrinoJdbcDialect)
已验证其适用于 Spark 3.3 和 Trino 420。