list 1 => [{ id: "123, x: "xxx" }] | list 2 => [{ id: "123", y: "yyy" }, {id: "456", y: "yyy"}]
因此,我想将这两个列表组合到新列表中,使用组方法组。这就是我尝试的。
// both list are not sorted and every id in the list_1 exists in list_2
// list_1 data class Object1(val id: String, val x: String)
val flux1 = Flux.fromIterable(list_1)
.map { obj -> obj.id to obj }
// list_2 data class Object2(val id: String, val y: String)
val flux2 = Flux.fromIteablle(list_2)
.map { obj -> obj.id to obj }
Flux.merge(flux1, flux2)
.groupBy { (id, obj) -> id }
.flatMap { gFlux ->
gFlux
.map { (id, obj) -> obj }
.collectList()
.filter { it.size == 2 }
.map { (obj1, obj2) -> Object3(obj1, obj2) }
}
.collectList()
但是有大尺寸的清单2,它开始永远冻结,我不知道为什么。我暂时通过过滤器通量修复了它,然后再对其进行分组,因为我必须基于list_1分组这些磁通。
Flux.merge(flux1, flux2)
.filter { (id, obj) -> id in list_1.map { it.id } }
因此,我想知道为什么要冻结,解决此问题的适当解决方案是什么,或者有任何更好的解决方案以基于第一个列表对两个列表进行分组?
?groupBy
):
当标准产生大量组时,如果这些组不适当地下游消耗,则可能会导致悬挂(例如,由于具有MaxCormurency参数的flatmap,设置了太低)。 因此,您不应使用不同的方式来汇总结果。在您的情况下,可能是:不过,这不是很有效。因为您将所有数据保存到最后一步,这与
groupBy
Flux.merge(flux1, flux2)
.reduce(mutableMapOf(), { (map, (id, obj)) ->
if (!map.containsKey(id) {
map[id] = mutableListOf()
}
map[id].add(obj)
map
}
.flatMap { Flux.fromIterable(it.entries()) }
.filter { it.value.size == 2 }
.map { it.value }
.map { (obj1, obj2) -> Object3(obj1, obj2) }
.collectList()
的问题相同。但是在这种情况下,您不会消耗所有调度程序,因此它可以更好地工作。如果您可以在还原时放置一些组,例如具有更多2个元素的组,那将是最佳的。
对我来说工作的是批量组合()的输出。您可以尝试groupBy
这将减轻flatmap()的压力