将 PySpark DataFrame 写入 DigitalOcean Spaces 会导致 Forbidden 403 错误

问题描述 投票:0回答:1

尝试将 PySpark DataFrame 写入 DigitalOcean Spaces 会导致“Forbidden (403)”错误。当使用提供的 PySpark 函数 get_spark() 并尝试使用函数 test_spaces() 将 DataFrame 写入 DigitalOcean Spaces 时,会出现“Forbidden”错误(403)”遇到错误。错误堆栈跟踪表明访问权限存在问题。但是使用 python

boto
客户端,我可以使用这些密钥/秘密访问空间。目前使用 PySpark 3.5,我正在使用here中的 jar 文件配置。

def get_spark() -> SparkSession:
    """Provides a well configured spark session"""
    return (
        SparkSession.builder.master("local[*]")
        .appName("test")
        .config("spark.jars.packages", "io.delta:delta-spark_2.12:3.0.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config(
            "spark.jars.packages",
            "io.delta:delta-spark_2.12:3.0.0,org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262",
        )
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .config("spark.hadoop.fs.s3a.access.key", "KEY")
        .config("spark.hadoop.fs.s3a.secret.key", "SECRET")
        .config("spark.haddop.fs.s3a.endpoint", "https://REGION.digitaloceanspaces.com")
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
        .config("spark.sql.warehouse.dir", WAREHOUSE_LOCATION)
        .enableHiveSupport()
        .getOrCreate()
    )


def test_spaces():
    """Creates the NHL roster table in the bronze layer"""
    spark = get_spark()
    # Create a simple DataFrame
    data = [("John", 25), ("Alice", 30), ("Bob", 28)]
    columns = ["Name", "Age"]
    df = spark.createDataFrame(data, columns)

    # Show the DataFrame
    df.show()

    # Write DataFrame to DigitalOcean Spaces
    df.write.json(f"s3a://bucket_name/test")

堆栈跟踪:

py4j.protocol.Py4JJavaError: An error occurred while calling o65.json.
: java.nio.file.AccessDeniedException: s3a://PATH: getFileStatus on s3a://PATH: com.amazonaws.services.s3.model.AmazonS3Exception: Forbidden (Service: Amazon S3; Status Code: 403; Error Code: 403 Forbidden; Request ID: 069ZFJ7PEE4SDT1B; S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==; Proxy: null), S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==:403 Forbidden
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:255)
        at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:175)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3796)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3688)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4703)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:499)
        at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:444)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2337)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2356)
        at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4701)
        at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:120)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
        at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:107)
        at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:98)
        at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:461)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
        at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:32)
        at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:437)
        at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:98)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:85)
        at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:83)
        at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:142)
        at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
        at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
        at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
        at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
        at org.apache.spark.sql.DataFrameWriter.json(DataFrameWriter.scala:774)
amazon-s3 pyspark digital-ocean digital-ocean-spaces
1个回答
0
投票
  1. 剪切 fs.s3a.impl 行。这是堆栈溢出帖子中流传下来的迷信,一次复制并粘贴。
  2. 错误包含扩展请求 ID
    S3 Extended Request ID: iExUnbCSQn8Tued6vyOcmZvu7BMm/6NVRWtopsmAHk572kPJxY5lV8C4BSkalexpg/18EgWnpAkpf2bTUElcxQ==; Proxy: null)
    这一事实暗示请求已发送至 aws s3,而不是
    digitaloceanspaces
    ,除非它们也包含它们(不知道第三方会这样做...)

确保这些 s3a 绑定设置能够到达您的代码和 s3a 连接器...我将把它作为文件系统 api 和调试的练习,因为它们是基本技能,而且这是您这边的配置问题,别人做不到

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.