尝试从 EMR 7.0.0 集群写入 S3 时出现 400 错误请求错误

问题描述 投票:0回答:1

我有一个使用 emr-5.29.0 和 Spark 2.4.4 完美运行的 Spark 应用程序。这个应用程序使用 Spark SQL 写入 S3,如下所示

df
    .repartition($"year", $"month", $"day")
    .write
    .partitionBy("year", "month", "day")
    .mode(saveMode)
    .parquet(path)

该路径是

s3a
有效路径。

当我升级到 Spark 3.5.0 和 emr-7.0.0 时,我的问题出现了。我的 jar 与 AWS 提供的依赖项一致,如下所示:

libraryDependencies ++= Seq(
  "org.apache.kafka" %% "kafka" % "3.4.1",
  "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0",
  "org.apache.spark" %% "spark-core" % "3.5.0",
  "org.apache.spark" %% "spark-sql" % "3.5.0",
  "org.apache.spark" %% "spark-streaming" % "3.5.0",
  "com.amazonaws" % "aws-java-sdk" % "1.12.569",
  "org.apache.hadoop" % "hadoop-aws" % "3.3.6",
)

但是在 EMR 7.0.0 中执行相应步骤时,尝试写入 S3 时作业失败:

xception in thread "main" org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://x-bucket/raw/xxx: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: xxxx, Extended Request ID: xxxxxxxx):null: null (Service: S3, Status Code: 400, Request ID: xxxxx, Extended Request ID: gxxxxxxx)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:237)
    at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:153)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3987)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3893)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4889)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
    at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2654)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2673)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4887)
    at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:125)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
    at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:520)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:559)
    at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:113)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
    at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
    at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
    at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
    at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
    at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
    at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
    at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
    at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:503)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
    at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
    at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:479)
    at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:101)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88)
    at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:86)
    at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:151)
    at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
    at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
    at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
    at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
    at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
    at com.d.data.pipeline.utils.ParquetEventWriter.write(ParquetEventWriter.scala:13)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.getKafkaEventsByTopicBetweenTimestamps(KafkaToS3SparkApp.scala:182)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1(KafkaToS3SparkApp.scala:130)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1$adapted(KafkaToS3SparkApp.scala:120)
    at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
    at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
    at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.processKafkaEventsInBetweenDates(KafkaToS3SparkApp.scala:120)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.main(KafkaToS3SparkApp.scala:75)
    at com.d.data.pipeline.batch.io.KafkaToS3SparkApp.main(KafkaToS3SparkApp.scala)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
    at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.base/java.lang.reflect.Method.invoke(Method.java:568)
    at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
    at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
    at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
    at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
    at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
    at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
    at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
    at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: JB2KN3QAMSCD504T, Extended Request ID: gcvyho7fncYObCK92QMAPIeanVQ8ShjPOoJNBE3dewSr18ZRBczNmIY00+sDe6lFjfjv+jShI7Q=)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
    at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:95)
    at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:270)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
    at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
    at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
    at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
    at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
    at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
    at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
    at software.amazon.awssdk.services.s3.DefaultS3Client.headObject(DefaultS3Client.java:5495)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$8(S3AFileSystem.java:2847)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
    at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:431)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2835)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2807)
    at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3965)

我正在使用

eu-west-1
区域。

EMR 正在使用

EMR_DefaultRole_v2
角色,也是为了尝试从等式中删除 authz,我将
AmazonS3FullAccess
策略添加到角色中,但没有骰子,错误仍然存在。

我还发现了这个:https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-access-grants.html但我不知道这是否可以是答案,因为我只需设置 S3 的完全访问权限即可。

此外,如果您知道如何调试/检查 s3 库抛出的请求 ID 的日志,那就太棒了:)

我预计该作业的行为与 emr-5.29.0 和 Spark 2.4.4 完全相同,但抛出的错误非常不透明且难以调试。

提前致谢。

apache-spark amazon-s3 apache-spark-sql amazon-emr
1个回答
0
投票

当我将 Spark 应用程序从 EMR 6.8.0 移动到 EMR 7.0.0 时,我遇到了同样的错误。我尝试访问的 S3 存储桶与我的 EMR 集群属于同一区域,即 us-west-2。 对我有用的解决方案是将“s3a://”替换为“s3://”。

根据 AWS 支持团队的说法, “对于较新版本的 EMR,建议使用‘s3://’而不是‘s3a://’或‘s3n://’以获得更好的性能、安全性和可靠性。 s3a:// 使用开源实现,而 s3:// 使用 AWS EMRFS 实现,我们始终建议在 EMR 上使用 s3://。”

https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-signing.html

© www.soinside.com 2019 - 2024. All rights reserved.