假设有以下MapReduce作业
映射器:
setup() 初始化一些状态
map()向状态添加数据,无输出
cleanup() 将状态输出到上下文
减速机:
将所有状态聚合为一个输出
如何在 Spark 中实现这样的工作?
补充问题:这样的工作在烫洗中如何实现? 我正在寻找以某种方式使方法重载的示例......
Spark
map
不提供 Hadoop setup
和 cleanup
的等效功能。它假设每个调用都是独立的并且没有副作用。
您可以获得的最接近的等效方法是将所需的逻辑放入
mapPartitions
或 mapPartitionsWithIndex
中,并使用简化的模板:
rdd.mapPartitions { iter => {
... // initalize state
val result = ??? // compute result for iter
... // perform cleanup
... // return results as an Iterator[U]
}}
在 scala 中设置的标准方法是使用惰性 val:
lazy val someSetupState = { .... }
data.map { x =>
useState(someSetupState, x)
...
只要
someSetupState
可以在任务上实例化(即它不使用提交节点的某些本地磁盘),上述方法就可以工作。这不涉及清理问题。对于清理,烫有一个方法:
TypedPipe[T]#onComplete(fn: () => Unit): TypedPipe[T]
最后在每个任务上运行。与映射示例类似,您可以执行关闭操作:
data.map { x =>
useState(someSetupState, x)
}
.onComplete { () =>
someSetupState.shutdown()
}
我不知道 Spark 的对应词。