我有一个Flux,我想把它转换为List。我怎么才能做到这一点?
Flux<Object> getInstances(String serviceId); // Current one
List<Object> getInstances(String serviceId); // Demanded one
Java 8或反应式组件有一个准备好的方法来映射或转换为List?
我应该使用 .map()
final List<ServiceInstance> sis = convertedStringList.parallelStream()
.map( this.reactiveDiscoveryClient::getInstances )
// It should be converted to List<Object>
在进入其他领域之前,先给大家一个公平的警告:转换一个新的产品。Flux
到 List
Stream
成全 不温不火 严格意义上的概念,因为你离开了推域,用一个拉域来交换。你可能想要,也可能不想要(通常你不想要),这取决于使用情况。只是想留下说明。
根据 Flux文件జజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజ collectList
方法将返回一个 Mono<List<T>>
. 它将立即返回,但它不是结果列表本身,而是一个懒惰的结构,即 Mono
承诺当序列完成后,最终会有结果。
根据 单声道文件జజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజ block
方法完成后将返回Mono的内容。请记住 block
可能返回null。
结合两者,你可以使用 someFlux.collectList().block()
. 条件是: someFlux
是一个 Flux<Object>
,其结果将是 List<Object>
.
该 block
方法不会返回任何东西,如果 Flux 是无限的。举个例子,下面会返回一个有两个单词的列表。
Flux.fromArray(new String[]{"foo", "bar"}).collectList().block()
但是下面的方法永远不会返回。
Flux.interval(Duration.ofMillis(1000)).collectList().block()
为了防止无限期阻塞或阻塞时间过长,你可以传递一个... Duration
争论 block
,但当订阅没有按时完成时,会异常超时。
根据 Flux文件జజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజజ toStream
方法将一个 Flux<T>
变成 Stream<T>
. 这对运营商比较友好,如 flatMap
. 注意这个简单的例子,为了便于演示。
Stream.of("f")
.flatMap(letter ->
Flux.fromArray(new String[]{"foo", "bar"})
.filter(word -> word.startsWith(letter)).toStream())
.collect(Collectors.toList())
我们可以简单地使用 .collectList().block().stream()
但这不仅降低了可读性,还可能导致 NPE,如果 block
返回null.这种方法对于一个无限的Flux来说,也是无法完成的,但是因为这是一个未知大小的流,所以在它完成之前,你仍然可以对它进行一些操作,而不会阻塞。这种方法对于一个无限的Flux也不能完成,但因为这是一个未知大小的流,所以在它完成之前,你仍然可以对它使用一些操作,而不会阻塞。