使用 Google Data Fusion 中的 Spark 工具重命名输出文件

问题描述 投票:0回答:2

我在 Google Data Fusion 中有一个管道,它在 Google Cloud 存储桶的目标目录中生成一个名为“part-00000-XXXXXX”的 CSV 文件(以及一个名为“_SUCCESS”的文件)。 “part-00000”之后的其余文件名总是不同且随机的。

enter image description here

管道通过解析、处理和连接输入文件(全部来自某些 Google Cloud Storage 位置)来生成新输出,然后将新输出与旧的现有输出文件连接起来,并吐出“part-00000”文件与名称为“internal_dashboard.csv”的旧输出文件位于同一位置。

通过任何可用的方式,我需要做的是以某种方式手动将“part-00000”文件重命名为“internal_dashboard.csv”并替换旧文件。

以下是我在 Spark Sink 中写的尝试(我从这里这里这里这里这里得到它们)。这个想法是首先找到一个文件名中包含“part-00000”的文件,然后重命名它并覆盖旧文件。到目前为止我所有的尝试都失败了:

  • 尝试1
import java.nio.file.{Files, Paths, StandardCopyOption}
import scala.util.matching.Regex

def recursiveListFiles(f: File, r: Regex): Array[File] = {
  val these = f.listFiles
  val good = these.filter(f => r.findFirstIn(f.getName).isDefined)
  good ++ these.filter(_.isDirectory).flatMap(recursiveListFiles(_,r))
}


def moveRenameFile(source: String, destination: String): Unit = {
    val path = Files.move(
        Paths.get(source),
        Paths.get(destination),
        StandardCopyOption.REPLACE_EXISTING
    )
    // could return `path`
}


def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)
 
  val existingfilename = recursiveListFiles(new File(fullpath), "part-00000-.*")
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}

  • 尝试2:

import java.io.File

def getListOfFiles(dir: File, extensions: List[String]): List[File] = {
    dir.listFiles.filter(_.isFile).toList.filter { file =>
        extensions.exists(file.getName.startsWith(_))
    }
}

def moveRenameFile(source: String, destination: String): Unit = {
    val path = Files.move(
        Paths.get(source),
        Paths.get(destination),
        StandardCopyOption.REPLACE_EXISTING
    )
    // could return `path`
}


def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(fullpath)
 
  val suffixList = List("part-00000")
  val existingfilename = getListOfFiles(new File(fullpath), suffixList )
  moveRenameFile(fullpath+existingfilename.head,fullpath+targetfilename)
}

  • 尝试3:
def sink(df: DataFrame, context: SparkExecutionPluginContext) : Unit = {

  val fullpath = "gs://some_bucket/output/internal_dashboard"
  val targetfilename = "internal_dashboad.csv"
  val pathandfile = fullpath + "/" + targefilename

  df.coalesce(1)
    .write.format("csv")
    .option("header", "true")
    .mode("append") // "overwrite" "append"
    .save(pathandfile )

dbutils.fs.ls(fullpath).filter(file=>file.name.endsWith("csv")).foreach(f => dbutils.fs.rm(f.path,true))
dbutils.fs.mv(dbutils.fs.ls(pathandfile).filter(file=>file.name.startsWith("part-00000"))(0).path,pathandfile ")
dbutils.fs.rm(pathandfile,true)
}

我需要 Scala 或其他方式的帮助,将“part-00000”文件重命名为“internal_dashboard.csv”并覆盖旧版本。

供没有使用过Data Fusion的人参考,我可以使用的工具是:

  • 火花汇:enter image description here

  • Scala Spark 程序(可以在 Sink 之前或之后):enter image description here

Description
Executes user-provided Spark code in Scala.

Use Case
This plugin can be used when you want arbitrary Spark code.

Properties
mainClass: The fully qualified class name for the Spark application. It must either be an object that has a main method define inside, with the method signature as def main(args: Array[String]): Unit; or it is a class that extends from the CDAP co.cask.cdap.api.spark.SparkMain trait that implements the run method, with the method signature as def run(implicit sec: SparkExecutionContext): Unit 

  • PySpark 程序(可以在 Sink 之前或之后): enter image description here
Description
Executes user-provided Spark code in Python.

Use Case
This plugin can be used when you want to run arbitrary Spark code.

编辑:

(2020 年 11 月 2 日)我刚刚了解到,还有一些 Google Cloud Functions 可以用 Python(或 Java)编写,并在其所在的存储桶中发生更改时触发。如果有人知道如何制作这样的触发时可以重命名并覆盖“part-00000”文件的函数,请告诉我。如果其他方法都失败,我会尝试一下。

scala csv apache-spark file-rename google-cloud-data-fusion
2个回答
1
投票

避免重命名 AWS S3 上的对象。不存在这样的事情,它所做的只是“剪切和粘贴”=>非常昂贵的操作。

你可以尝试一下:

import org.apache.spark.sql.SaveMode
df.write.mode(SaveMode.Overwrite).parquet(outputBasePath)

如果您坚持使用“重命名”,请使用 Hadoop 库,而不是 java:

import org.apache.hadoop.fs.Path
import org.apache.hadoop.conf.Configuration

val srcPath = new Path("source/...")
val destPath = new Path("dest/...")

srcPath.getFileSystem(new Configuration()).rename(srcPath, destPath)

注意:使用 AWS S3 时,两个路径必须位于同一个 Bucket 中(它们具有不同的 FileSystem 对象,适用于使用 rename(...) 时)。


0
投票

无需使用 Spark 即可回答 使用 GCS 移动

源码路径 fusion_output/folder3/2024-09-26-17-20/part-0000

目标路径启用宏 fusion_output/folder3/2024-09-26-17-20/Store_${逻辑开始时间(yyyy-MM-dd'T'HH-mm-ss,1d-4h+30m)}.csv

© www.soinside.com 2019 - 2024. All rights reserved.