我找到了很多关于 RxJava 的答案,但我想了解它在 Reactor 中的工作原理。
我目前的理解非常模糊,我倾向于认为map是同步的,而flatMap是异步的,但我无法真正理解它。
这是一个例子:
files.flatMap { it ->
Mono.just(Paths.get(UPLOAD_ROOT, it.filename()).toFile())
.map {destFile ->
destFile.createNewFile()
destFile
}
.flatMap(it::transferTo)
}.then()
我有文件(a
Flux<FilePart>
),我想将其复制到服务器上的某个UPLOAD_ROOT
。
这个例子取自一本书。
我可以将所有
.map
更改为 .flatMap
,反之亦然,一切仍然有效。我想知道有什么区别。
map
用于同步、非阻塞、一对一转换flatMap
用于异步(非阻塞)1 到 N 转换差异在方法签名中可见:
map
接受 Function<T, U>
并返回 Flux<U>
flatMap
接受 Function<T, Publisher<V>>
并返回 Flux<V>
这是主要提示:你可以将
Function<T, Publisher<V>>
传递给map
,但它不知道如何处理Publishers
,这将导致Flux<Publisher<V>>
,一系列惰性出版商。
另一方面,
flatMap
期望每个 Publisher<V>
都有一个 T
。它知道如何处理它:订阅它并在输出序列中传播它的元素。因此,返回类型为 Flux<V>
: flatMap
会将每个内部 Publisher<V>
展平为 all V
的输出序列。
关于1-N方面:
对于每个
<T>
输入元素,flatMap
将其映射到 Publisher<V>
。在某些情况下(例如 HTTP 请求),该发布者将仅发出一项,在这种情况下,我们非常接近异步 map
。
但这就是堕落的情况。一般情况是
Publisher
可以发出多个元素,并且 flatMap
也能工作。
举个例子,假设您有一个反应式数据库,并且您从一系列用户 ID 中进行平面映射,并发出一个返回用户组
Badge
的请求。您最终会得到所有这些用户的所有徽章中的一个 Flux<Badge>
。
map
真的是同步且非阻塞吗?
是的:它在运算符应用它的方式上是同步的(简单的方法调用,然后运算符发出结果),并且在函数本身不应该阻止调用它的运算符的意义上是非阻塞的。换句话说,它不应该引入延迟。这是因为 Flux
整体上仍然是异步的。如果它阻塞了中间序列,则会影响其余的
Flux
处理,甚至其他
Flux
。如果您的映射函数阻塞/引入延迟,但无法转换为返回
Publisher
,请考虑使用
publishOn
/
subscribeOn
来抵消单独线程上的阻塞工作。
Mono<T>
或
Flux<T>
。使用map方法会导致
Mono<Mono<T>>
而使用 flatMap 会产生
Mono<T>
。例如,当您必须使用返回 Mono 的 java API 进行网络调用来检索数据,然后进行另一个需要第一个网络调用结果的网络调用时,它非常有用。
// Signature of the HttpClient.get method
Mono<JsonObject> get(String url);
// The two urls to call
String firstUserUrl = "my-api/first-user";
String userDetailsUrl = "my-api/users/details/"; // needs the id at the end
// Example with map
Mono<Mono<JsonObject>> result = HttpClient.get(firstUserUrl).
map(user -> HttpClient.get(userDetailsUrl + user.getId()));
// This results with a Mono<Mono<...>> because HttpClient.get(...)
// returns a Mono
// Same example with flatMap
Mono<JsonObject> bestResult = HttpClient.get(firstUserUrl).
flatMap(user -> HttpClient.get(userDetailsUrl + user.getId()));
// Now the result has the type we expected
此外,它还可以精确处理错误:
public UserApi {
private HttpClient httpClient;
Mono<User> findUser(String username) {
String queryUrl = "http://my-api-address/users/" + username;
return Mono.fromCallable(() -> httpClient.get(queryUrl)).
flatMap(response -> {
if (response.statusCode == 404) return Mono.error(new NotFoundException("User " + username + " not found"));
else if (response.statusCode == 500) return Mono.error(new InternalServerErrorException());
else if (response.statusCode != 200) return Mono.error(new Exception("Unknown error calling my-api"));
return Mono.just(response.data);
});
}
}
Player
类。
@Data
@AllArgsConstructor
public class Player {
String name;
String name;
}
现在创建 Player
类的一些实例
Flux<Player> players = Flux.just(
"Zahid Khan",
"Arif Khan",
"Obaid Sheikh")
.map(fullname -> {
String[] split = fullname.split("\\s");
return new Player(split[0], split[1]);
});
StepVerifier.create(players)
.expectNext(new Player("Zahid", "Khan"))
.expectNext(new Player("Arif", "Khan"))
.expectNext(new Player("Obaid", "Sheikh"))
.verifyComplete();
了解 map() 的重要一点是映射是 同步执行,因为每个项目都是由源 Flux 发布的。 如果你想异步执行映射,你应该考虑 flatMap() 操作。FlatMap 内部如何工作。
Flux<Player> players = Flux.just(
"Zahid Khan",
"Arif Khan",
"Obaid Sheikh")
.flatMap(
fullname ->
Mono.just(fullname).map(p -> {
String[] split = p.split("\\s");
return new Player(split[0], split[1]);
}).subscribeOn(Scheduler.parallel()));
List<Player> playerList = Arrays.asList(
new Player("Zahid", "Khan"),
new Player("Arif", "Khan"),
new Player("Obaid", "Sheikh"));
StepVerifier.create(players).expectNextMatches(player ->
playerList.contains(player))
.expectNextMatches(player ->
playerList.contains(player))
.expectNextMatches(player ->
playerList.contains(player))
.expectNextMatches(player ->
playerList.contains(player))
.verifyComplete();
在Flatmap()内部,对Mono执行map()操作,将String转换为Player。此外,subscribeOn() 指示每个订阅应该在并行线程中进行。 在没有 subscribeOn() 的情况下,flatmap() 充当同步线程。
该映射用于同步、非阻塞、一对一转换 而 flatMap 用于异步(非阻塞)一对多转换。参考: