反应器Netty,使用Flux :: Groupby永远冻结

问题描述 投票:0回答:2
我有两个列表具有对象的差异类型,但具有相同的属性ID。例如

list 1 => [{ id: "123, x: "xxx" }] | list 2 => [{ id: "123", y: "yyy" }, {id: "456", y: "yyy"}]

因此,我想将这两个列表组合到新列表中,使用组方法组。这就是我尝试的。
// both list are not sorted and every id in the list_1 exists in list_2 // list_1 data class Object1(val id: String, val x: String) val flux1 = Flux.fromIterable(list_1) .map { obj -> obj.id to obj } // list_2 data class Object2(val id: String, val y: String) val flux2 = Flux.fromIteablle(list_2) .map { obj -> obj.id to obj } Flux.merge(flux1, flux2) .groupBy { (id, obj) -> id } .flatMap { gFlux -> gFlux .map { (id, obj) -> obj } .collectList() .filter { it.size == 2 } .map { (obj1, obj2) -> Object3(obj1, obj2) } } .collectList()

但是有大尺寸的清单2,它开始永远冻结,我不知道为什么。我暂时通过过滤器通量修复了它,然后再对其进行分组,因为我必须基于list_1分组这些磁通。
Flux.merge(flux1, flux2)
    .filter { (id, obj) -> id in list_1.map { it.id } }

因此,我想知道为什么要冻结,解决此问题的适当解决方案是什么,或者有任何更好的解决方案以基于第一个列表对两个列表进行分组?

	

groupBy
kotlin spring-webflux reactive-programming project-reactor reactor-netty
2个回答
1
投票
javadoc

):


当标准产生大量组时,如果这些组不适当地下游消耗,则可能会导致悬挂(例如,由于具有MaxCormurency参数的flatmap,设置了太低)。

因此,您不应使用不同的方式来汇总结果。在您的情况下,可能是:

groupBy

不过,这不是很有效。因为您将所有数据保存到最后一步,这与
Flux.merge(flux1, flux2) .reduce(mutableMapOf(), { (map, (id, obj)) -> if (!map.containsKey(id) { map[id] = mutableListOf() } map[id].add(obj) map } .flatMap { Flux.fromIterable(it.entries()) } .filter { it.value.size == 2 } .map { it.value } .map { (obj1, obj2) -> Object3(obj1, obj2) } .collectList()

的问题相同。但是在这种情况下,您不会消耗所有调度程序,因此它可以更好地工作。如果您可以在还原时放置一些组,例如具有更多2个元素的组,那将是最佳的。


对我来说工作的是批量组合()的输出。您可以尝试
groupBy

这将减轻flatmap()
的压力


0
投票
最新问题
© www.soinside.com 2019 - 2025. All rights reserved.