在任务失败时将已处理的数据缓存在 Spark 中,以便任务重新启动时不会重新处理相同的数据

问题描述 投票:0回答:1

我们有一个用例,其中我们需要缓存某些已处理的数据,以便 Spark 在任务失败时不会重新处理相同的数据。

假设我们有一千个用于特定任务的 Foo 对象,这些对象是我们从某个 REST API 调用接收到的,并且 Spark 已经处理了其中的大约 700 个对象,当任务发生失败时。然后,当任务重新启动时,应再次使用分区参数查询 REST API 并接收一千个对象。这次,它不应该重新处理已经处理过的七百个对象,而是继续处理剩余的三百个。

那么 Spark 中有什么可用的东西可以达到这个目的吗?谢谢。

apache-spark scala-spark
1个回答
0
投票

保存 API 调用结果的方式有很多,最明显的一种是缓存结果,如下所示,或者为每个查询对象在方法中添加重试策略

def func(spark: SparkSession) = {
    import spark.implicits._

    case class Foo(url: String)
    case class Result(url: String, response: String)

    val ds = Seq(
      Foo("https://www.google.com"),
      Foo("https://www.yahoo.com")
    ).toDS

    def makeAPICall(foo: Foo): Result = {
      // make API call
      Result(foo.url, "response")
    }

    val callResults = ds.map(makeAPICall).cache()
    
    callResults.show(false)
  }
© www.soinside.com 2019 - 2024. All rights reserved.