我在 Google Data Fusion 中有一个管道,它在 Google Cloud 存储桶的目标目录中生成一个名为“part-00000-XXXXXX”的 CSV 文件(以及一个名为“_SUCCESS”的文件)。 “part-00000”之后的其余文件名总是不同且随机的。
管道通过解析、处理和连接输入文件(全部来自某些 Google Cloud Storage 位置)来生成新输出,然后将新输出与旧的现有输出文件连接起来,并吐出“part-00000”文件与名称为“internal_dashboard.csv”的旧输出文件位于同一位置。
通过任何可用的方式,我需要做的是以某种方式手动将“part-00000”文件重命名为“internal_dashboard.csv”并替换旧文件。
以下是我在 Spark Sink 中写的尝试(我从这里、这里、这里、这里和这里得到它们)。这个想法是首先找到一个文件名中包含“part-00000”的文件,然后重命名它并覆盖旧文件。到目前为止我所有的尝试都失败了:
import java.nio.file.{Files, Paths, StandardCopyOption}
import scala.util.matching.Regex
def recursiveListFiles(f: File, r: Regex): Array[File] = {
val these = f.listFiles
val good = these.filter(f => r.findFirstIn(f.getName).isDefined)
good ++ these.filter(_.isDirectory).flatMap(recursiveListFiles(_,r))
}
def moveRenameFile(source: String, destination: String): Unit = {
val path = Files.move(
Paths.get(source),
Paths.get(destination),
StandardCopyOption.REPLACE_EXISTING
)
// could return `path`
}
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
df.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("append") // "overwrite" "append"
.save(fullpath)
val existingfilename = recursiveListFiles(new File(fullpath), "part-00000-.*")
moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}
import java.io.File
def getListOfFiles(dir: File, extensions: List[String]): List[File] = {
dir.listFiles.filter(_.isFile).toList.filter { file =>
extensions.exists(file.getName.startsWith(_))
}
}
def moveRenameFile(source: String, destination: String): Unit = {
val path = Files.move(
Paths.get(source),
Paths.get(destination),
StandardCopyOption.REPLACE_EXISTING
)
// could return `path`
}
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
df.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("append") // "overwrite" "append"
.save(fullpath)
val suffixList = List("part-00000")
val existingfilename = getListOfFiles(new File(fullpath), suffixList )
moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {
val fullpath = "gs://some_bucket/output/internal_dashboard"
val targetfilename = "internal_dashboad.csv"
val pathandfile = fullpath + "/" + targefilename
df.coalesce(1)
.write.format("csv")
.option("header", "true")
.mode("append") // "overwrite" "append"
.save(pathandfile )
dbutils.fs.ls(fullpath).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(pathandfile).filter(file=>file.name.startsWith("part-00000"))(0).path,pathandfile ")
dbutils.fs.rm(pathandfile,true)
}
我需要 Scala 或其他方式的帮助,将“part-00000”文件重命名为“internal_dashboard.csv”并覆盖旧版本。
供没有使用过Data Fusion的人参考,我可以使用的工具是:
Description
Executes user-provided Spark code in Scala.
Use Case
This plugin can be used when you want arbitrary Spark code.
Properties
mainClass: The fully qualified class name for the Spark application. It must either be an object that has a main method define inside, with the method signature as def main(args: Array[String]): Unit; or it is a class that extends from the CDAP co.cask.cdap.api.spark.SparkMain trait that implements the run method, with the method signature as def run(implicit sec: SparkExecutionContext): Unit
Description
Executes user-provided Spark code in Python.
Use Case
This plugin can be used when you want to run arbitrary Spark code.
(2020 年 11 月 2 日)我刚刚了解到,还有一些 Google Cloud Functions 可以用 Python(或 Java)编写,并在其所在的存储桶中发生更改时触发。如果有人知道如何制作这样的触发时可以重命名并覆盖“part-00000”文件的函数,请告诉我。如果其他方法都失败,我会尝试一下。
避免重命名 AWS S3 上的对象。不存在这样的事情,它所做的只是“剪切和粘贴”=>非常昂贵的操作。
你可以尝试一下:
import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet(outputBasePath)
如果您坚持使用“重命名”,请使用 Hadoop 库,而不是 java:
import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration
val srcPath = new Path("source/...")
val destPath = new Path("dest/...")
srcPath.getFileSystem(new Configuration()).rename(srcPath, destPath)
注意:使用 AWS S3 时,两个路径必须位于同一个 Bucket 中(它们具有不同的 FileSystem 对象,适用于使用 rename(...) 时)。
无需使用 Spark 即可回答 使用 GCS 移动
源码路径 fusion_output/folder3/2024-09-26-17-20/part-0000
目标路径启用宏 fusion_output/folder3/2024-09-26-17-20/Store_${逻辑开始时间(yyyy-MM-dd'T'HH-mm-ss,1d-4h+30m)}.csv