我有一个反应式服务并且运行良好
public Mono<GetCasesListResponse> getCasesList(@RequestBody GetCasesListRequest request) {
return casesService.getCasesList(mapToGetCasesListServiceRequest(request))
.map(GetCasesListMapper::mapGetCasesListResponse)
.onErrorResume(exceptionHandler::fallbackMethod);
}
案例服务
public Mono<GetCasesListServiceResponse> getCasesList(GetCasesListServiceRequest request) {
return webClient.post()
.uri("/getCasesList")
.bodyValue(request)
.retrieve()
.bodyToMono(GetCasesListResponse.class);
}
但现在我有下一个任务。对于地图中的每个元素,并行从另一个服务以及列表中获取数据。并将此数据添加到 GetCasesListResponse 中数组的每个元素。 我决定使用 Flux.fromIterable
首先添加flatMap
public Mono<GetCasesListResponse> getCasesList(@RequestBody GetCasesListRequest request) {
return casesService.getCasesList(mapToGetCasesListServiceRequest(request))
.map(GetCasesListMapper::mapGetCasesListResponse)
.flatMap(this::getMoreComments)
.onErrorResume(exceptionHandler::fallbackMethod);
}
private Mono<GetCasesListResponse> getMoreComments(GetCasesListResponse casesListResponse) {
collectExtIds(casesListResponse).forEach((id, extIdList) -> fromIterable(extIdList).parallel()
.flatMap(extId -> stpService.getMoreComments(mapToGetMoreCommentsRequest(extId)))
.subscribe(moreCommentsResponse -> addMoreCommentsToGetCasesListResponse(casesListResponse, id, moreCommentsResponse)));
return Mono.just(casesListResponse);
}
获取更多评论
public Mono<GetCommentResponse> getMoreComments(GetCommentRequest request) {
return webClient.post()
.uri("/getCaseComments")
.bodyValue(request)
.retrieve()
.bodyToMono(GetCommentResponse.class)
}
但是我对 getMoreComments 有问题,因为此方法在 fromIterable 完成之前返回 Mono.just(casesListResponse) 。我得到了没有任何附加数据的caseListResponse。我如何等待从 fromIterable 接收每个元素的数据?
有几个问题:
subscribe
,这是你应该避免使用的重击规则,除非你知道为什么你到底需要它并且不能依赖其他方式then(result)
reduce
),并仅在最后一步将其设置为结果。但是,如果您确定您的对象是线程安全的,您可以在代码中执行类似的操作(正如我在示例中所示的那样)
parallel
在你的情况下没有任何作用。它仅对高 CPU
map
有用,并且需要
runOn
配置。标准
flatMap
已经平行。
Mono<GetCasesListResponse> getMoreComments(
GetCasesListResponse casesListResponse
) {
Flux.fromIterable(collectExtIds(casesListResponse))
.flatMap( (id, extIdList) ->
Flux.fromIterable(extIdList)
.flatMap(extId ->
stpService.getMoreComments(
mapToGetMoreCommentsRequest(extId)
)
)
.reduce(casesListResponse, (accum, moreComments) ->
addMoreCommentsToGetCasesListResponse(
accum, id, moreComments)
)
)
)
.last()
}
我在这里使用 last
只是因为所有
flatMap
返回
casesListResponse
对象的相同实例。但如果你可以做一个
then(casesListResponse)
来代替。另外我建议以一种可以接受所有评论列表的方式重写
GetCasesListResponse
,这样reduce就不会操作共享实例