我正在使用 Scala 2.12 运行 Spark 3.5.0,并且有一个关于 mapPartition 的问题。我正在 mapPartitions 内创建一个可变列表,并在迭代器内插入项目。当我尝试在迭代分区中的所有项目后获取项目时,列表为空。有谁知道这是否符合预期?这是代码的人为示例:
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val encoder = ExpressionEncoder(df.schema)
df.mapPartitions(it => {
val list = mutable.ListBuffer[Row]()
val result = it.map(row => {
list += row
row
})
println(s"list.size: ${list.size}")
result
})(encoder).show()
找到这篇文章,看起来问题是 Scala 对迭代器的惰性求值。
Apache Spark mapPartition 奇怪的行为(懒惰评估?)
如果我更改代码以从迭代器获取列表,我会得到预期的结果。
import org.apache.spark.sql.catalyst.encoders.ExpressionEncoder
val encoder = ExpressionEncoder(df.schema)
df.mapPartitions(it => {
val list = mutable.ListBuffer[Row]()
val rowList = it.toList
val result = rowList.map(row => {
list += row
row
})
println(s"list.size: ${list.size}")
result.iterator
})(encoder).show()
这可行,但我想知道这样做是否有任何缺点?例如,会使用额外的内存来构建列表。