我正在尝试使用 Databricks 和 AWS Redshift 来执行 SQL SELECT 命令。
我浏览了https://github.com/databricks/spark-redshift自述文件并配置:
user
和 password
选项dbfs
mount 挂载了 AWS S3。temporary_aws_access_key_id
、temporary_aws_secret_access_key
、temporary_aws_session_token
注意这是一种概念证明,因此我忽略了所有安全细节,例如加密。
%python
# Read data from a table
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:postgresql://<REDSHIFT_URL>:<DB_PORT>/<DB_NAME>") \
.option("temporary_aws_access_key_id", "XXX") \
.option("temporary_aws_secret_access_key","XXX") \
.option("temporary_aws_session_token", "XXX") \
.option("user", "XXX") \
.option("password", "XXX") \
.option("tempdir", "dbfs:/mnt/result_bucket/...") \
.option("query", "SELECT * FROM users") \
.load()
# display(df) #SQL Exception
结果:
但是当我取消最后一行的注释并尝试查看 SQL SELECT 结果时:
java.sql.SQLException: Exception thrown in awaitResult:
at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:223)
at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:197)
at com.databricks.spark.redshift.RedshiftRelation.$anonfun$getRDDFromS3$1(RedshiftRelation.scala:212)
at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:377)
at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:363)
at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:34)
at com.databricks.spark.redshift.RedshiftRelation.getRDDFromS3(RedshiftRelation.scala:212)
at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:157)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$3(DataSourceStrategy.scala:426)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:460)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:538)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:459)
at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:426)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:69)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:69)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
at scala.collection.Iterator.foreach(Iterator.scala:941)
at scala.collection.Iterator.foreach$(Iterator.scala:941)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82)
at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100)
at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75)
at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:493)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:129)
at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:180)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:180)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:129)
at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:122)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:141)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:141)
at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:136)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:199)
at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:199)
at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:260)
at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:226)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:123)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3823)
at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3031)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:268)
at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:102)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:526)
at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:672)
at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:490)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:727)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:608)
at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:687)
at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:634)
at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:650)
at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:221)
at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:608)
at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:208)
at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$11(DriverLocal.scala:526)
at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:503)
at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
at scala.util.Try$.apply(Try.scala:213)
at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: UNLOAD destination is not supported. (Hint: only S3 based unload is allowed)
at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2477)
at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2190)
at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:300)
at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:428)
at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:354)
at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:169)
at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:158)
at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1(RedshiftJDBCWrapper.scala:197)
at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1$adapted(RedshiftJDBCWrapper.scala:197)
at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$2(RedshiftJDBCWrapper.scala:215)
at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
at scala.util.Success.$anonfun$map$1(Try.scala:255)
at scala.util.Success.map(Try.scala:213)
at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
... 1 more
Databricks 运行时版本:
9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)
我已经尝试使用 JDBC redshift 驱动程序进行相同的操作(使用 URL 前缀
jdbc:redshift
)
然后我必须将 com.github.databricks:spark-redshift_2.11:master-SNAPSHOT
安装到我的 Databricks 集群库中。
结果是一样的。
有人知道我的配置有什么问题吗?
经过多次尝试,我找到了解决方案。
forward_spark_s3_credentials
s3a
路径而不是安装 dbfs
目录RedshiftJDBC42_no_awssdk_1_2_55_1083.jar
com.github.databricks:spark-redshift_2.11:master-SNAPSHOT
。最终配置:
df = spark.read \
.format("com.databricks.spark.redshift") \
.option("url", "jdbc:redshift://<REDSHIFT_URL>:<DB_PORT>/<DB_NAME>") \
.option("forward_spark_s3_credentials", "true") \
.option("user", "XXX") \
.option("password", "XXX") \
.option("tempdir", "s3a://<MY_S3_BUCKET>/...") \
.option("query", "SELECT userid, username FROM users") \
.load()
display(df)
集群中设置的库: (可能只需要 Redshift JDBC 驱动程序。我还添加了 AWS 捆绑包中的库(可以在此处
找到)最终代码将位于 我的 GitHub 上。
@pgrabarczyk 的答案似乎与此处找到的 Databricks 在线文档一致(并且对我很有帮助):https://docs.databricks.com/en/connect/external-systems/amazon-redshift.html
但是,对于我的一些同事来说,S3 写入访问是不可能的,因此这里有另一种获取数据的方法,该方法仍然使用 Databricks 提供的驱动程序:
def pull_rs(qry):
##########credentials
host = 'foo.bar.fubar.com'
port = 5439
db = 'your_db'
user = 'your_username'
pw = 'your_pw'
##########credentials
url = f'jdbc:redshift://{host}:{port}/{db}?ssl=true' ##the SSL statement was required to make my code work
return (spark
.read
.format("redshift")
.option("url", url)
.option("user", user)
.option("password", pw)
.option("query", qry)
.load()
)
#make your calls:
qry = """
SELECT
*
FROM
your_schema.your_tablename
limit 1000
"""
df = pull_rs(qry)
display(df)
或者,这里有一个名为 redshift-connector 的强大库:https://pypi.org/project/redshift-connector/
它允许游标执行很多操作,包括执行 redshift 存储过程。
其中一个错误可能出现在选项('url')中。应该是 jdbc:redshift 而不是 jdbc:postgresql,使用 redshift 并重试