我有一个使用 emr-5.29.0 和 Spark 2.4.4 完美运行的 Spark 应用程序。这个应用程序使用 Spark SQL 写入 S3,如下所示
df
.repartition($"year", $"month", $"day")
.write
.partitionBy("year", "month", "day")
.mode(saveMode)
.parquet(path)
该路径是
s3a
有效路径。
当我升级到 Spark 3.5.0 和 emr-7.0.0 时,我的问题出现了。我的 jar 与 AWS 提供的依赖项一致,如下所示:
libraryDependencies ++= Seq(
"org.apache.kafka" %% "kafka" % "3.4.1",
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.5.0",
"org.apache.spark" %% "spark-core" % "3.5.0",
"org.apache.spark" %% "spark-sql" % "3.5.0",
"org.apache.spark" %% "spark-streaming" % "3.5.0",
"com.amazonaws" % "aws-java-sdk" % "1.12.569",
"org.apache.hadoop" % "hadoop-aws" % "3.3.6",
)
但是在 EMR 7.0.0 中执行相应步骤时,尝试写入 S3 时作业失败:
xception in thread "main" org.apache.hadoop.fs.s3a.AWSBadRequestException: getFileStatus on s3a://x-bucket/raw/xxx: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: xxxx, Extended Request ID: xxxxxxxx):null: null (Service: S3, Status Code: 400, Request ID: xxxxx, Extended Request ID: gxxxxxxx)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:237)
at org.apache.hadoop.fs.s3a.S3AUtils.translateException(S3AUtils.java:153)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3987)
at org.apache.hadoop.fs.s3a.S3AFileSystem.innerGetFileStatus(S3AFileSystem.java:3893)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$exists$34(S3AFileSystem.java:4889)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.invokeTrackingDuration(IOStatisticsBinding.java:547)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.lambda$trackDurationOfOperation$5(IOStatisticsBinding.java:528)
at org.apache.hadoop.fs.statistics.impl.IOStatisticsBinding.trackDuration(IOStatisticsBinding.java:449)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2654)
at org.apache.hadoop.fs.s3a.S3AFileSystem.trackDurationAndSpan(S3AFileSystem.java:2673)
at org.apache.hadoop.fs.s3a.S3AFileSystem.exists(S3AFileSystem.java:4887)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:125)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:113)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:111)
at org.apache.spark.sql.execution.command.DataWritingCommandExec.executeCollect(commands.scala:125)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.$anonfun$executeCollect$1(AdaptiveSparkPlanExec.scala:520)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.withFinalPlanUpdate(AdaptiveSparkPlanExec.scala:559)
at org.apache.spark.sql.execution.adaptive.AdaptiveSparkPlanExec.executeCollect(AdaptiveSparkPlanExec.scala:520)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.$anonfun$applyOrElse$1(QueryExecution.scala:113)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
at org.apache.spark.sql.execution.SQLExecution$.executeQuery$1(SQLExecution.scala:129)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$9(SQLExecution.scala:165)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:108)
at org.apache.spark.sql.execution.SQLExecution$.withTracker(SQLExecution.scala:255)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$8(SQLExecution.scala:165)
at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:276)
at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:164)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:70)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:110)
at org.apache.spark.sql.execution.QueryExecution$$anonfun$eagerlyExecuteCommands$1.applyOrElse(QueryExecution.scala:101)
at org.apache.spark.sql.catalyst.trees.TreeNode.$anonfun$transformDownWithPruning$1(TreeNode.scala:503)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDownWithPruning(TreeNode.scala:503)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.org$apache$spark$sql$catalyst$plans$logical$AnalysisHelper$$super$transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning(AnalysisHelper.scala:267)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.transformDownWithPruning$(AnalysisHelper.scala:263)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.transformDownWithPruning(LogicalPlan.scala:33)
at org.apache.spark.sql.catalyst.trees.TreeNode.transformDown(TreeNode.scala:479)
at org.apache.spark.sql.execution.QueryExecution.eagerlyExecuteCommands(QueryExecution.scala:101)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted$lzycompute(QueryExecution.scala:88)
at org.apache.spark.sql.execution.QueryExecution.commandExecuted(QueryExecution.scala:86)
at org.apache.spark.sql.execution.QueryExecution.assertCommandExecuted(QueryExecution.scala:151)
at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:859)
at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:388)
at org.apache.spark.sql.DataFrameWriter.saveInternal(DataFrameWriter.scala:361)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:240)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:792)
at com.d.data.pipeline.utils.ParquetEventWriter.write(ParquetEventWriter.scala:13)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.getKafkaEventsByTopicBetweenTimestamps(KafkaToS3SparkApp.scala:182)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1(KafkaToS3SparkApp.scala:130)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.$anonfun$processKafkaEventsInBetweenDates$1$adapted(KafkaToS3SparkApp.scala:120)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.processKafkaEventsInBetweenDates(KafkaToS3SparkApp.scala:120)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp$.main(KafkaToS3SparkApp.scala:75)
at com.d.data.pipeline.batch.io.KafkaToS3SparkApp.main(KafkaToS3SparkApp.scala)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:568)
at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:1075)
at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:194)
at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:217)
at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:91)
at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1167)
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1176)
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: software.amazon.awssdk.services.s3.model.S3Exception: null (Service: S3, Status Code: 400, Request ID: JB2KN3QAMSCD504T, Extended Request ID: gcvyho7fncYObCK92QMAPIeanVQ8ShjPOoJNBE3dewSr18ZRBczNmIY00+sDe6lFjfjv+jShI7Q=)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleErrorResponse(AwsXmlPredicatedResponseHandler.java:156)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handleResponse(AwsXmlPredicatedResponseHandler.java:108)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:85)
at software.amazon.awssdk.protocols.xml.internal.unmarshall.AwsXmlPredicatedResponseHandler.handle(AwsXmlPredicatedResponseHandler.java:43)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler$Crc32ValidationResponseHandler.handle(AwsSyncClientHandler.java:95)
at software.amazon.awssdk.core.internal.handler.BaseClientHandler.lambda$successTransformationResponseHandler$7(BaseClientHandler.java:270)
at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:38)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:72)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:55)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:39)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:50)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:32)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:198)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
at software.amazon.awssdk.services.s3.DefaultS3Client.headObject(DefaultS3Client.java:5495)
at org.apache.hadoop.fs.s3a.S3AFileSystem.lambda$getObjectMetadata$8(S3AFileSystem.java:2847)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:468)
at org.apache.hadoop.fs.s3a.Invoker.retryUntranslated(Invoker.java:431)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2835)
at org.apache.hadoop.fs.s3a.S3AFileSystem.getObjectMetadata(S3AFileSystem.java:2807)
at org.apache.hadoop.fs.s3a.S3AFileSystem.s3GetFileStatus(S3AFileSystem.java:3965)
我正在使用
eu-west-1
区域。
EMR 正在使用
EMR_DefaultRole_v2
角色,也是为了尝试从等式中删除 authz,我将 AmazonS3FullAccess
策略添加到角色中,但没有骰子,错误仍然存在。
我还发现了这个:https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-access-grants.html但我不知道这是否可以是答案,因为我只需设置 S3 的完全访问权限即可。
此外,如果您知道如何调试/检查 s3 库抛出的请求 ID 的日志,那就太棒了:)
我预计该作业的行为与 emr-5.29.0 和 Spark 2.4.4 完全相同,但抛出的错误非常不透明且难以调试。
提前致谢。
当我将 Spark 应用程序从 EMR 6.8.0 移动到 EMR 7.0.0 时,我遇到了同样的错误。我尝试访问的 S3 存储桶与我的 EMR 集群属于同一区域,即 us-west-2。 对我有用的解决方案是将“s3a://”替换为“s3://”。
根据 AWS 支持团队的说法, “对于较新版本的 EMR,建议使用‘s3://’而不是‘s3a://’或‘s3n://’以获得更好的性能、安全性和可靠性。 s3a:// 使用开源实现,而 s3:// 使用 AWS EMRFS 实现,我们始终建议在 EMR 上使用 s3://。”
https://docs.aws.amazon.com/AmazonS3/latest/API/sigv4-auth-using-authorization-header.html https://docs.aws.amazon.com/IAM/latest/UserGuide/reference_aws-signing.html