如何在Spark中按键分区RDD?

问题描述 投票:9回答:2

鉴于HashPartitioner文档说:

[HashPartitioner]使用Java的Object.hashCode实现基于散列的分区。

说我想用它的DeviceData来分割kind

case class DeviceData(kind: String, time: Long, data: String)

通过覆盖RDD[DeviceData]方法并仅使用deviceData.hashCode()的哈希码来划分kind是否正确?

但鉴于HashPartitioner需要一些分区参数,我很困惑我是否需要提前知道种类数量,如果有多种类型而不是分区会发生什么?

如果我将分区数据写入磁盘,它会在读取时保持分区吗?

我的目标是致电

  deviceDataRdd.foreachPartition(d: Iterator[DeviceData] => ...)

并且在迭代器中只有DeviceData具有相同的kind值。

scala apache-spark rdd
2个回答
8
投票

怎么样只使用groupByKeykind。或另一种PairRDDFunctions方法。

你让我觉得你并不真正关心分区,只是你在一个处理流程中得到了所有特定的类型?

配对功能允许:

rdd.keyBy(_.kind).partitionBy(new HashPartitioner(PARTITIONS))
   .foreachPartition(...)

但是,您可能会更喜欢更安全的东西:

rdd.keyBy(_.kind).reduceByKey(....)

mapValues或其他一些功能,保证你整体的作品


7
投票

通过覆盖deviceData.hashCode()方法并仅使用类型的哈希码对RDD [DeviceData]进行分区是否正确?

它不会。如果您使用Java Object.hashCode文档,您将找到有关hashCode总合同的以下信息:

如果两个对象根据equals(Object)方法相等,则对两个对象中的每一个调用hashCode方法必须生成相同的整数结果。

因此,除非纯粹基于设备的kind的平等概念符合您的用例,并且我严重怀疑它,修补HashCode以获得所需的分区是一个坏主意。一般情况下你应该implement your own partitioner但这里不是必需的。

因为,除了SQL和GraphX中的特殊场景,partitionBy仅在PairRDD上有效,创建RDD[(String, DeviceData)]并使用普通HashPartitioner是有意义的

deviceDataRdd.map(dev => (dev.kind, dev)).partitionBy(new HashPartitioner(n))

请记住,在kind具有低基数或使用它进行分区的高度偏斜分布的情况下,可能不是最佳解决方案。

© www.soinside.com 2019 - 2024. All rights reserved.