在 Windows 上使用 Staging S3A Committer 写入 S3 时出现 UnsatisfiedLinkError

问题描述 投票:0回答:3

我正在尝试使用 Apache Spark 将 Parquet 数据写入 AWS S3 目录。我在 Windows 10 上使用本地计算机,没有安装 Spark 和 Hadoop,而是将它们添加为 SBT 依赖项(Hadoop 3.2.1、Spark 2.4.5)。我的 SBT 如下:

scalaVersion := "2.11.11"

libraryDependencies ++= Seq(
  "org.apache.spark" %% "spark-sql" % "2.4.5",
  "org.apache.spark" %% "spark-hadoop-cloud" % "2.3.2.3.1.0.6-1",

  "org.apache.hadoop" % "hadoop-client" % "3.2.1",
  "org.apache.hadoop" % "hadoop-common" % "3.2.1",
  "org.apache.hadoop" % "hadoop-aws" % "3.2.1",

  "com.amazonaws" % "aws-java-sdk-bundle" % "1.11.704"
)

dependencyOverrides ++= Seq(
  "com.fasterxml.jackson.core" % "jackson-core" % "2.11.0",
  "com.fasterxml.jackson.core" % "jackson-databind" % "2.11.0",
  "com.fasterxml.jackson.module" %% "jackson-module-scala" % "2.11.0"
)

resolvers ++= Seq(
  "apache" at "https://repo.maven.apache.org/maven2",
  "hortonworks" at "https://repo.hortonworks.com/content/repositories/releases/",
)

我使用 S3A Staging Directory Committer,如 HadoopCloudera 文档中所述。我也知道 StackOverflow 上的这两个问题,并使用它们进行正确的配置:

我已经添加了所有必需的(据我所知)配置,包括最新的两个特定于 Parquet 的配置:

val spark = SparkSession.builder()
      .appName("test-run-s3a-commiters")
      .master("local[*]")

      .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
      .config("spark.hadoop.fs.s3a.endpoint", "s3.eu-central-1.amazonaws.com")
      .config("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
      .config("spark.hadoop.fs.s3a.connection.maximum", "100")

      .config("spark.hadoop.fs.s3a.committer.name", "directory")
      .config("spark.hadoop.fs.s3a.committer.magic.enabled", "false")
      .config("spark.hadoop.fs.s3a.committer.staging.conflict-mode", "append")
      .config("spark.hadoop.fs.s3a.committer.staging.unique-filenames", "true")
      .config("spark.hadoop.fs.s3a.committer.staging.abort.pending.uploads", "true")
      .config("spark.hadoop.fs.s3a.buffer.dir", "tmp/")
      .config("spark.hadoop.fs.s3a.committer.staging.tmp.path", "hdfs_tmp/")
      .config("spark.hadoop.mapreduce.fileoutputcommitter.algorithm.version", "2")
      .config("spark.hadoop.mapreduce.outputcommitter.factory.scheme.s3a", "org.apache.hadoop.fs.s3a.commit.S3ACommitterFactory")

      .config("spark.sql.sources.commitProtocolClass", "org.apache.spark.internal.io.cloud.PathOutputCommitProtocol")
      .config("spark.sql.parquet.output.committer.class", "org.apache.spark.internal.io.cloud.BindingParquetOutputCommitter")
      .getOrCreate()

spark.sparkContext.setLogLevel("info")

从日志中我可以看到StagingCommitter实际上已应用(我还可以在指定路径下看到本地文件系统中的中间数据,并且在执行期间S3中没有_temporary目录,就像默认的FileOutputCommitter一样)。

然后我运行简单的代码将测试数据写入 S3 存储桶:

import spark.implicits._

val sourceDF = spark
  .range(0, 10000)
  .map(id => {
    Thread.sleep(10)
    id
  })

sourceDF
  .write
  .format("parquet")
  .save("s3a://my/test/bucket/")

(我使用

Thread.sleep
来模拟一些处理,几乎没有时间检查本地临时目录和S3存储桶的中间内容)

但是,我在提交任务尝试期间收到

java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat
错误。 下面是一段日志(减少到 1 个执行程序)和错误堆栈跟踪。

20/05/09 15:13:18 INFO InternalParquetRecordWriter: Flushing mem columnStore to file. allocated memory: 15000
20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0
20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: needsTaskCommit() Task attempt_20200509151301_0000_m_000000_0: duration 0:00.005s
20/05/09 15:13:18 INFO StagingCommitter: Starting: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0
20/05/09 15:13:18 INFO StagingCommitter: Task committer attempt_20200509151301_0000_m_000000_0: commit task attempt_20200509151301_0000_m_000000_0: duration 0:00.019s
20/05/09 15:13:18 ERROR Utils: Aborting task
java.lang.UnsatisfiedLinkError: org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Ljava/lang/String;)Lorg/apache/hadoop/io/nativeio/NativeIO$POSIX$Stat;
    at org.apache.hadoop.io.nativeio.NativeIO$POSIX.stat(Native Method)
    at org.apache.hadoop.io.nativeio.NativeIO$POSIX.getStat(NativeIO.java:460)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfoByNativeIO(RawLocalFileSystem.java:821)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.loadPermissionInfo(RawLocalFileSystem.java:735)
    at org.apache.hadoop.fs.RawLocalFileSystem$DeprecatedRawLocalFileStatus.getPermission(RawLocalFileSystem.java:703)
    at org.apache.hadoop.fs.LocatedFileStatus.<init>(LocatedFileStatus.java:52)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2091)
    at org.apache.hadoop.fs.FileSystem$4.next(FileSystem.java:2071)
    at org.apache.hadoop.fs.FileSystem$5.hasNext(FileSystem.java:2190)
    at org.apache.hadoop.fs.s3a.S3AUtils.applyLocatedFiles(S3AUtils.java:1295)
    at org.apache.hadoop.fs.s3a.S3AUtils.flatmapLocatedFiles(S3AUtils.java:1333)
    at org.apache.hadoop.fs.s3a.S3AUtils.listAndFilter(S3AUtils.java:1350)
    at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.getTaskOutput(StagingCommitter.java:385)
    at org.apache.hadoop.fs.s3a.commit.staging.StagingCommitter.commitTask(StagingCommitter.java:641)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
    at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:77)
    at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.commitTask(HadoopMapReduceCommitProtocol.scala:225)
    at org.apache.spark.internal.io.cloud.PathOutputCommitProtocol.commitTask(PathOutputCommitProtocol.scala:220)
    at org.apache.spark.sql.execution.datasources.FileFormatDataWriter.commit(FileFormatDataWriter.scala:78)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:247)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask$3.apply(FileFormatWriter.scala:242)
    at org.apache.spark.util.Utils$.tryWithSafeFinallyAndFailureCallbacks(Utils.scala:1394)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$.org$apache$spark$sql$execution$datasources$FileFormatWriter$$executeTask(FileFormatWriter.scala:248)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:170)
    at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:169)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
20/05/09 15:13:18 ERROR Utils: Aborting task

根据我目前的理解,配置是正确的。可能是某些版本不兼容或者我本地环境设置导致的错误。

提供的代码对于 ORC 和 CSV 可以按预期工作,没有任何错误,但对于 Parquet 则不然。

请提出可能导致错误的原因以及如何解决此问题?

windows apache-spark amazon-s3 hadoop apache-spark-sql
3个回答
8
投票

对于来到这里的每个人,我都找到了解决方案。正如预期的那样,该问题与 S3A 输出提交者或库依赖项无关。

由于 SBT 依赖项中的 Hadoop 版本与我的 Windows 计算机上的 winutils.exe(HDFS 包装器)之间的版本不兼容,引发了 Java 本机方法的 UnsatisfiedLinkError 异常。

我已经从 cdarlint/winutils 下载了相应的版本,并且一切正常。哈哈


1
投票

这与安装没有支持 file:// URL 的本机库有关,而 s3a 使用它来缓冲写入。

您可以切换到使用内存进行缓冲 - 只需确保上传到 s3 的速度与生成数据的速度一样快。 s3a 文档中介绍了一些选项,通过限制活动块的数量来帮助管理单个输出流可以排队并行上传。

  <property>
    <name>fs.s3a.fast.upload.buffer</name>
    <value>bytebuffer</value>
  </property>

0
投票

除了OP的答案之外,请确保您的PATH环境变量包含%HADOOP_HOME(对于Windows) - 否则下载正确的winutils版本将无法工作。

© www.soinside.com 2019 - 2024. All rights reserved.