我们有一个用例,其中我们需要缓存某些已处理的数据,以便 Spark 在任务失败时不会重新处理相同的数据。
假设我们有一千个用于特定任务的 Foo 对象,这些对象是我们从某个 REST API 调用接收到的,并且 Spark 已经处理了其中的大约 700 个对象,当任务发生失败时。然后,当任务重新启动时,应再次使用分区参数查询 REST API 并接收一千个对象。这次,它不应该重新处理已经处理过的七百个对象,而是继续处理剩余的三百个。
那么 Spark 中有什么可用的东西可以达到这个目的吗?谢谢。
保存 API 调用结果的方式有很多,最明显的一种是缓存结果,如下所示,或者为每个查询对象在方法中添加重试策略
def func(spark: SparkSession) = {
import spark.implicits._
case class Foo(url: String)
case class Result(url: String, response: String)
val ds = Seq(
Foo("https://www.google.com"),
Foo("https://www.yahoo.com")
).toDS
def makeAPICall(foo: Foo): Result = {
// make API call
Result(foo.url, "response")
}
val callResults = ds.map(makeAPICall).cache()
callResults.show(false)
}