使用 Databricks(和 Apache Spark)从 AWS Redshift 读取

问题描述 投票:0回答:3

我正在尝试使用 Databricks 和 AWS Redshift 来执行 SQL SELECT 命令。

我浏览了https://github.com/databricks/spark-redshift自述文件并配置:

  • Spark 驱动程序到 Redshift - 我正在传递
    user
    password
    选项
  • Spark 到 S3 - 我已使用
    dbfs
    mount 挂载了 AWS S3。
  • 红移至 S3 - 我正在通过
    temporary_aws_access_key_id
    temporary_aws_secret_access_key
    temporary_aws_session_token

注意这是一种概念证明,因此我忽略了所有安全细节,例如加密。

我在 Databricks Notebook 中使用的配置如下:

%python

# Read data from a table
df = spark.read \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:postgresql://<REDSHIFT_URL>:<DB_PORT>/<DB_NAME>") \
  .option("temporary_aws_access_key_id", "XXX") \
  .option("temporary_aws_secret_access_key","XXX") \
  .option("temporary_aws_session_token", "XXX") \
  .option("user", "XXX") \
  .option("password", "XXX") \
  .option("tempdir", "dbfs:/mnt/result_bucket/...") \
  .option("query", "SELECT * FROM users") \
  .load()

# display(df) #SQL Exception

结果:

但是当我取消最后一行的注释并尝试查看 SQL SELECT 结果时:

java.sql.SQLException: Exception thrown in awaitResult: 
    at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:223)
    at com.databricks.spark.redshift.JDBCWrapper.executeInterruptibly(RedshiftJDBCWrapper.scala:197)
    at com.databricks.spark.redshift.RedshiftRelation.$anonfun$getRDDFromS3$1(RedshiftRelation.scala:212)
    at scala.runtime.java8.JFunction0$mcZ$sp.apply(JFunction0$mcZ$sp.java:23)
    at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:377)
    at com.databricks.backend.daemon.driver.ProgressReporter$.withStatusCode(ProgressReporter.scala:363)
    at com.databricks.spark.util.SparkDatabricksProgressReporter$.withStatusCode(ProgressReporter.scala:34)
    at com.databricks.spark.redshift.RedshiftRelation.getRDDFromS3(RedshiftRelation.scala:212)
    at com.databricks.spark.redshift.RedshiftRelation.buildScan(RedshiftRelation.scala:157)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$apply$3(DataSourceStrategy.scala:426)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.$anonfun$pruneFilterProject$1(DataSourceStrategy.scala:460)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProjectRaw(DataSourceStrategy.scala:538)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.pruneFilterProject(DataSourceStrategy.scala:459)
    at org.apache.spark.sql.execution.datasources.DataSourceStrategy$.apply(DataSourceStrategy.scala:426)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$2(QueryPlanner.scala:69)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$1(QueryPlanner.scala:69)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:489)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$4(QueryPlanner.scala:85)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1(TraversableOnce.scala:162)
    at scala.collection.TraversableOnce.$anonfun$foldLeft$1$adapted(TraversableOnce.scala:162)
    at scala.collection.Iterator.foreach(Iterator.scala:941)
    at scala.collection.Iterator.foreach$(Iterator.scala:941)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
    at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:162)
    at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:160)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1429)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.$anonfun$plan$3(QueryPlanner.scala:82)
    at scala.collection.Iterator$$anon$11.nextCur(Iterator.scala:484)
    at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:490)
    at org.apache.spark.sql.catalyst.planning.QueryPlanner.plan(QueryPlanner.scala:100)
    at org.apache.spark.sql.execution.SparkStrategies.plan(SparkStrategies.scala:75)
    at org.apache.spark.sql.execution.QueryExecution$.createSparkPlan(QueryExecution.scala:493)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$sparkPlan$1(QueryExecution.scala:129)
    at com.databricks.spark.util.FrameProfiler$.record(FrameProfiler.scala:80)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:134)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:180)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:180)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan$lzycompute(QueryExecution.scala:129)
    at org.apache.spark.sql.execution.QueryExecution.sparkPlan(QueryExecution.scala:122)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$executedPlan$1(QueryExecution.scala:141)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan$lzycompute(QueryExecution.scala:141)
    at org.apache.spark.sql.execution.QueryExecution.executedPlan(QueryExecution.scala:136)
    at org.apache.spark.sql.execution.QueryExecution.$anonfun$simpleString$2(QueryExecution.scala:199)
    at org.apache.spark.sql.execution.ExplainUtils$.processPlan(ExplainUtils.scala:115)
    at org.apache.spark.sql.execution.QueryExecution.simpleString(QueryExecution.scala:199)
    at org.apache.spark.sql.execution.QueryExecution.org$apache$spark$sql$execution$QueryExecution$$explainString(QueryExecution.scala:260)
    at org.apache.spark.sql.execution.QueryExecution.explainStringLocal(QueryExecution.scala:226)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$5(SQLExecution.scala:123)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:273)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withCustomExecutionEnv$1(SQLExecution.scala:104)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:854)
    at org.apache.spark.sql.execution.SQLExecution$.withCustomExecutionEnv(SQLExecution.scala:77)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:223)
    at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3823)
    at org.apache.spark.sql.Dataset.collectResult(Dataset.scala:3031)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation0(OutputAggregator.scala:268)
    at com.databricks.backend.daemon.driver.OutputAggregator$.withOutputAggregation(OutputAggregator.scala:102)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.generateTableResult(PythonDriverLocalBase.scala:526)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.computeListResultsItem(PythonDriverLocal.scala:672)
    at com.databricks.backend.daemon.driver.PythonDriverLocalBase.genListResults(PythonDriverLocalBase.scala:490)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$getResultBufferInternal$1(PythonDriverLocal.scala:727)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:608)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.getResultBufferInternal(PythonDriverLocal.scala:687)
    at com.databricks.backend.daemon.driver.DriverLocal.getResultBuffer(DriverLocal.scala:634)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.outputSuccess(PythonDriverLocal.scala:650)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.$anonfun$repl$6(PythonDriverLocal.scala:221)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.withInterpLock(PythonDriverLocal.scala:608)
    at com.databricks.backend.daemon.driver.PythonDriverLocal.repl(PythonDriverLocal.scala:208)
    at com.databricks.backend.daemon.driver.DriverLocal.$anonfun$execute$11(DriverLocal.scala:526)
    at com.databricks.logging.UsageLogging.$anonfun$withAttributionContext$1(UsageLogging.scala:266)
    at scala.util.DynamicVariable.withValue(DynamicVariable.scala:62)
    at com.databricks.logging.UsageLogging.withAttributionContext(UsageLogging.scala:261)
    at com.databricks.logging.UsageLogging.withAttributionContext$(UsageLogging.scala:258)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionContext(DriverLocal.scala:50)
    at com.databricks.logging.UsageLogging.withAttributionTags(UsageLogging.scala:305)
    at com.databricks.logging.UsageLogging.withAttributionTags$(UsageLogging.scala:297)
    at com.databricks.backend.daemon.driver.DriverLocal.withAttributionTags(DriverLocal.scala:50)
    at com.databricks.backend.daemon.driver.DriverLocal.execute(DriverLocal.scala:503)
    at com.databricks.backend.daemon.driver.DriverWrapper.$anonfun$tryExecutingCommand$1(DriverWrapper.scala:689)
    at scala.util.Try$.apply(Try.scala:213)
    at com.databricks.backend.daemon.driver.DriverWrapper.tryExecutingCommand(DriverWrapper.scala:681)
    at com.databricks.backend.daemon.driver.DriverWrapper.getCommandOutputAndError(DriverWrapper.scala:522)
    at com.databricks.backend.daemon.driver.DriverWrapper.executeCommand(DriverWrapper.scala:634)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInnerLoop(DriverWrapper.scala:427)
    at com.databricks.backend.daemon.driver.DriverWrapper.runInner(DriverWrapper.scala:370)
    at com.databricks.backend.daemon.driver.DriverWrapper.run(DriverWrapper.scala:221)
    at java.lang.Thread.run(Thread.java:748)
Caused by: org.postgresql.util.PSQLException: ERROR: UNLOAD destination is not supported. (Hint: only S3 based unload is allowed)
    at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2477)
    at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2190)
    at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:300)
    at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:428)
    at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:354)
    at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:169)
    at org.postgresql.jdbc.PgPreparedStatement.execute(PgPreparedStatement.java:158)
    at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1(RedshiftJDBCWrapper.scala:197)
    at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$1$adapted(RedshiftJDBCWrapper.scala:197)
    at com.databricks.spark.redshift.JDBCWrapper.$anonfun$executeInterruptibly$2(RedshiftJDBCWrapper.scala:215)
    at scala.concurrent.Future$.$anonfun$apply$1(Future.scala:659)
    at scala.util.Success.$anonfun$map$1(Try.scala:255)
    at scala.util.Success.map(Try.scala:213)
    at scala.concurrent.Future.$anonfun$map$1(Future.scala:292)
    at scala.concurrent.impl.Promise.liftedTree1$1(Promise.scala:33)
    at scala.concurrent.impl.Promise.$anonfun$transform$1(Promise.scala:33)
    at scala.concurrent.impl.CallbackRunnable.run(Promise.scala:64)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    ... 1 more

更多详情:

Databricks 运行时版本:

9.1 LTS (includes Apache Spark 3.1.2, Scala 2.12)

我已经尝试使用 JDBC redshift 驱动程序进行相同的操作(使用 URL 前缀

jdbc:redshift
) 然后我必须将
com.github.databricks:spark-redshift_2.11:master-SNAPSHOT
安装到我的 Databricks 集群库中。 结果是一样的。

Redshift 内的数据(AWS 创建的示例数据):

有人知道我的配置有什么问题吗?

apache-spark amazon-redshift databricks
3个回答
2
投票

经过多次尝试,我找到了解决方案。

  • 我删除了临时密钥
  • 我用过
    forward_spark_s3_credentials
  • 我将 IAM 角色附加到 EC2(集群)
  • 我使用
    s3a
    路径而不是安装
    dbfs
    目录
  • 更新集群的库:
    • 我用过
      RedshiftJDBC42_no_awssdk_1_2_55_1083.jar
    • 并删除了
      com.github.databricks:spark-redshift_2.11:master-SNAPSHOT

最终配置:

df = spark.read \
  .format("com.databricks.spark.redshift") \
  .option("url", "jdbc:redshift://<REDSHIFT_URL>:<DB_PORT>/<DB_NAME>") \
  .option("forward_spark_s3_credentials", "true") \
  .option("user", "XXX") \
  .option("password", "XXX") \
  .option("tempdir", "s3a://<MY_S3_BUCKET>/...") \
  .option("query", "SELECT userid, username FROM users") \
  .load()

display(df)

集群中设置的库: (可能只需要 Redshift JDBC 驱动程序。我还添加了 AWS 捆绑包中的库(可以在此处

找到)

最终代码将位于 我的 GitHub 上


0
投票

@pgrabarczyk 的答案似乎与此处找到的 Databricks 在线文档一致(并且对我很有帮助):https://docs.databricks.com/en/connect/external-systems/amazon-redshift.html

但是,对于我的一些同事来说,S3 写入访问是不可能的,因此这里有另一种获取数据的方法,该方法仍然使用 Databricks 提供的驱动程序:

def pull_rs(qry):
  ##########credentials
  host = 'foo.bar.fubar.com'
  port = 5439
  db = 'your_db'
  user = 'your_username'
  pw = 'your_pw'
  ##########credentials

  url = f'jdbc:redshift://{host}:{port}/{db}?ssl=true' ##the SSL statement was required to make my code work
  
  return (spark
        .read 
        .format("redshift")
        .option("url", url)
        .option("user", user)
        .option("password", pw)
        .option("query", qry)
        .load()
      )

#make your calls:
qry = """
      SELECT 
        * 
      FROM 
        your_schema.your_tablename 
      limit 1000
      """
df = pull_rs(qry)

display(df)

或者,这里有一个名为 redshift-connector 的强大库:https://pypi.org/project/redshift-connector/

它允许游标执行很多操作,包括执行 redshift 存储过程。


-1
投票

其中一个错误可能出现在选项('url')中。应该是 jdbc:redshift 而不是 jdbc:postgresql,使用 redshift 并重试

© www.soinside.com 2019 - 2024. All rights reserved.