如何在RocksDB上创建数据框(SST文件)

问题描述 投票:0回答:1

我们将文档保存在 RocksDB 中。我们将把这些 RocksDB sst 文件同步到 S3。我想在 SST 文件上创建一个数据框,然后运行 SQL 查询。当我用谷歌搜索时,我找不到任何相关的连接器。

做到这一点的最佳方法是什么?我们可以为此编写一个自定义数据框实现吗?我们使用 Spark 3.1.0 和 Scala 2.12

(将 RocksDB 转换为 JSON 和 Parquet,然后从 Parquet 中读取数据帧非常耗时且资源密集——120 1 核节点分钟。所以我不能使用这种方法。)

dataframe scala apache-spark rocksdb
1个回答
0
投票

如果你知道如何使用纯 scala 代码解析文件,你只需要 Spark 将其分发到执行器上:

  1. 列出 s3 存储桶中的文件名,这将导致

    Seq[String]

  2. 将它们转换为数据集/数据框(

    spark.createDataset
    )

  3. 进行

    mapPartitions
    操作

  4. mapPartitions
    内部,初始化s3Client,使用它读取文件的文件内容。使用纯 scala 代码进行解析,输出为案例类

  5. mapPratitions
    的输出将是解析文件的数据集

更新:

这是代码示例

我不知道什么是 rock db 文件以及如何通过纯 scala 代码读取它们

假设你可以用普通的 scala 来阅读它们,而不需要 Spark

下一个代码示例可用于使用 Spark 并行读取/解析

  val session: SparkSession = SparkSession.builder().config(new SparkConf().setMaster("local")).getOrCreate()
  
  import session.implicits._

  def listS3Files(bucket: String, prefix: String): Seq[String] = {
    val s3Client = AmazonS3ClientBuilder.standard.build // add credentials, region if needed
    val listObjectsRequest = (new ListObjectsV2Request)
      .withBucketName(bucket)
      .withPrefix(prefix)
    s3Client.listObjectsV2(listObjectsRequest).getObjectSummaries.asScala.map(_.getKey).filter(p => !p.endsWith("/"))
  }
  
  val bucket = "your-bucket"
  val files = listS3Files(bucket, "test")

  val result = files
    .toDS()
    .mapPartitions(fs => {
      val s3Client = AmazonS3ClientBuilder.standard.build // add credentials, region if needed
      // here got content as string, in your case you need to read rock db files and parse them to case class
      val content = fs.map(f => IOUtils.toString(s3Client.getObject(bucket, f).getObjectContent, StandardCharsets.UTF_8))
      content
    }).cache()
© www.soinside.com 2019 - 2024. All rights reserved.