无论如何,我想更改spark输出文件名,Spark在AWS S3中编写。
检查下面的代码将同时适用于HDFS
和S3
。
创建的rename
函数将更改路径和名称,如果directory
包含多个文件,则只需在文件名末尾附加sequence
号,例如json_data_1.json
import org.apache.hadoop.fs.{FileSystem, Path, RemoteIterator}
import org.apache.hadoop.fs._
// For converting to scala Iterator
implicit def convertToScalaIterator[T](remoteIterator: RemoteIterator[T]): Iterator[T] = {
case class wrapper(remoteIterator: RemoteIterator[T]) extends Iterator[T] {
override def hasNext: Boolean = remoteIterator.hasNext
override def next(): T = remoteIterator.next()
}
wrapper(remoteIterator)
}
import java.net.URI
def fs(path: String) = FileSystem.get(URI.create(path),spark.sparkContext.hadoopConfiguration)
// Rename files
def rename(path: String,name: String) = {
fs(path)
.listFiles(new Path(path),true)
.toList
.filter(_.isFile)
.map(_.getPath)
.filterNot(_.toString.contains("_SUCCESS"))
.zipWithIndex
.map(p => fs(p._1.toString).rename(p._1,new Path(s"${p._1.getParent}/${name}_${p._2}.${p._1.toString.split("\\.")(1)}")))
}
scala> df.repartition(5).write.format("json").mode("overwrite").save("/tmp/samplea/")
scala> "ls -ltr /tmp/samplea/".!
total 8
-rw-r--r-- 1 vn50ftc wheel 0 Jun 6 13:57 part-00000-607ffd5e-7d28-4331-9a69-de36254c80b1-c000.json
-rw-r--r-- 1 vn50ftc wheel 282 Jun 6 13:57 part-00001-607ffd5e-7d28-4331-9a69-de36254c80b1-c000.json
-rw-r--r-- 1 vn50ftc wheel 0 Jun 6 13:57 _SUCCESS
res191: Int = 0
scala> rename("/tmp/samplea","json_data")
res193: List[Boolean] = List(true, true)
scala> "ls -ltr /tmp/samplea/".!
total 8
-rw-r--r-- 1 vn50ftc wheel 0 Jun 6 13:57 json_data_0.json
-rw-r--r-- 1 vn50ftc wheel 282 Jun 6 13:57 json_data_1.json
-rw-r--r-- 1 vn50ftc wheel 0 Jun 6 13:57 _SUCCESS
res194: Int = 0