如何重写 Spark 地图功能中的设置和清理方法

问题描述 投票:0回答:2

假设有以下MapReduce作业

映射器

setup() 初始化一些状态

map()向状态添加数据,无输出

cleanup() 将状态输出到上下文

减速机

将所有状态聚合为一个输出

如何在 Spark 中实现这样的工作?

补充问题:这样的工作在烫洗中如何实现? 我正在寻找以某种方式使方法重载的示例......

scala apache-spark scalding
2个回答
3
投票

Spark

map
不提供 Hadoop
setup
cleanup
的等效功能。它假设每个调用都是独立的并且没有副作用。

您可以获得的最接近的等效方法是将所需的逻辑放入

mapPartitions
mapPartitionsWithIndex
中,并使用简化的模板:

rdd.mapPartitions { iter => {
   ... // initalize state
   val result = ??? // compute result for iter
   ... // perform cleanup
   ... // return results as an Iterator[U]
}}

1
投票

在 scala 中设置的标准方法是使用惰性 val:

lazy val someSetupState = { .... }
data.map { x =>
  useState(someSetupState, x)
  ...

只要

someSetupState
可以在任务上实例化(即它不使用提交节点的某些本地磁盘),上述方法就可以工作。这不涉及清理问题。对于清理,烫有一个方法:

    TypedPipe[T]#onComplete(fn: () => Unit): TypedPipe[T]

最后在每个任务上运行。与映射示例类似,您可以执行关闭操作:

    data.map { x =>
      useState(someSetupState, x)
    }
    .onComplete { () =>
      someSetupState.shutdown()
    }

我不知道 Spark 的对应词。

© www.soinside.com 2019 - 2024. All rights reserved.