反应堆中的map与flatMap

问题描述 投票:0回答:3

我找到了很多关于 RxJava 的答案,但我想了解它在 Reactor 中的工作原理。

我目前的理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但我无法真正理解它。

这是一个例子:

files.flatMap { it ->
    Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
        .map {destFile ->
            destFile.createNewFile()
            destFile    
        }               
        .flatMap(it::transferTo)
}.then()  

我有文件(a

Flux<FilePart>
),我想将其复制到服务器上的某个
UPLOAD_ROOT

这个例子取自一本书。

我可以将所有

.map
更改为
.flatMap
,反之亦然,一切仍然有效。我想知道有什么区别。

java spring-webflux project-reactor
3个回答
84
投票
  • map
    用于同步、非阻塞、一对一转换
  • flatMap
    用于异步(非阻塞)1 到 N 转换

差异在方法签名中可见:

  • map
    接受
    Function<T, U>
    并返回
    Flux<U>
  • flatMap
    接受
    Function<T, Publisher<V>>
    并返回
    Flux<V>

这是主要提示:你可以

Function<T, Publisher<V>>
传递给
map
,但它不知道如何处理
Publishers
,这将导致
Flux<Publisher<V>>
,一系列惰性出版商。

另一方面,

flatMap
期望每个
Publisher<V>
都有一个
T
。它知道如何处理它:订阅它并在输出序列中传播它的元素。因此,返回类型为
Flux<V>
flatMap
会将每个内部
Publisher<V>
展平为 all
V
的输出序列。

关于1-N方面:

对于每个

<T>
输入元素,
flatMap
将其映射到
Publisher<V>
。在某些情况下(例如 HTTP 请求),该发布者将仅发出一项,在这种情况下,我们非常接近异步
map

但这就是堕落的情况。一般情况是

Publisher
可以发出多个元素,并且
flatMap
也能工作。

举个例子,假设您有一个反应式数据库,并且您从一系列用户 ID 中进行平面映射,并发出一个返回用户组

Badge
的请求。您最终会得到所有这些用户的所有徽章中的一个
Flux<Badge>

map
真的是同步且非阻塞吗?

是的:它在运算符应用它的方式上是同步的(简单的方法调用,然后运算符发出结果),并且在函数本身不应该阻止调用它的运算符的意义上是非阻塞的。换句话说,它不应该引入延迟。这是因为

Flux

 整体上仍然是异步的。如果它阻塞了中间序列,则会影响其余的 
Flux
 处理,甚至其他 
Flux

如果您的映射函数阻塞/引入延迟,但无法转换为返回

Publisher

,请考虑使用 
publishOn
/
subscribeOn
 来抵消单独线程上的阻塞工作。


11
投票
flatMap 方法与 map 方法类似,主要区别在于您提供给它的供应商应该返回

Mono<T>

Flux<T>

使用map方法会导致

Mono<Mono<T>>

而使用 flatMap 会产生 
Mono<T>

例如,当您必须使用返回 Mono 的 java API 进行网络调用来检索数据,然后进行另一个需要第一个网络调用结果的网络调用时,它非常有用。

// Signature of the HttpClient.get method Mono<JsonObject> get(String url); // The two urls to call String firstUserUrl = "my-api/first-user"; String userDetailsUrl = "my-api/users/details/"; // needs the id at the end // Example with map Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl). map(user -> HttpClient.get(userDetailsUrl + user.getId())); // This results with a Mono<Mono<...>> because HttpClient.get(...) // returns a Mono // Same example with flatMap Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl). flatMap(user -> HttpClient.get(userDetailsUrl + user.getId())); // Now the result has the type we expected
此外,它还可以精确处理错误:

public UserApi { private HttpClient httpClient; Mono<User> findUser(String username) { String queryUrl = "http://my-api-address/users/" + username; return Mono.fromCallable(() -> httpClient.get(queryUrl)). flatMap(response -> { if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found")); else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException()); else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api")); return Mono.just(response.data); }); } }
    

8
投票
map 在 Reactor 内部如何工作。

How MAP internally works

创建一个

Player

 类。

@Data @AllArgsConstructor public class Player { String name; String name; }
现在创建 

Player

 类的一些实例

Flux<Player> players = Flux.just( "Zahid Khan", "Arif Khan", "Obaid Sheikh") .map(fullname -> { String[] split = fullname.split("\\s"); return new Player(split[0], split[1]); }); StepVerifier.create(players) .expectNext(new Player("Zahid", "Khan")) .expectNext(new Player("Arif", "Khan")) .expectNext(new Player("Obaid", "Sheikh")) .verifyComplete();

了解 map() 的重要一点是映射是 同步执行,因为每个项目都是由源 Flux 发布的。 如果你想异步执行映射,你应该考虑 flatMap() 操作。

FlatMap 内部如何工作。

How FlatMap internally works.

Flux<Player> players = Flux.just( "Zahid Khan", "Arif Khan", "Obaid Sheikh") .flatMap( fullname -> Mono.just(fullname).map(p -> { String[] split = p.split("\\s"); return new Player(split[0], split[1]); }).subscribeOn(Scheduler.parallel())); List<Player> playerList = Arrays.asList( new Player("Zahid", "Khan"), new Player("Arif", "Khan"), new Player("Obaid", "Sheikh")); StepVerifier.create(players).expectNextMatches(player -> playerList.contains(player)) .expectNextMatches(player -> playerList.contains(player)) .expectNextMatches(player -> playerList.contains(player)) .expectNextMatches(player -> playerList.contains(player)) .verifyComplete();
在Flatmap()内部,对Mono执行map()操作,将String转换为Player。此外,

subscribeOn() 指示每个订阅应该在并行线程中进行。 在没有 subscribeOn() 的情况下,flatmap() 充当同步线程。

该映射用于同步、非阻塞、一对一转换 而 flatMap 用于异步(非阻塞)一对多转换。

参考:

Spring in Action,第五版

© www.soinside.com 2019 - 2024. All rights reserved.