我有一个场景来处理客户号码列表并更新数据库中的详细信息
public void updateCustomerDetailsInDB(List<String> numberList)
for(String number : numberList){
if(!isExistBy(number)) //Check the details of the customer in db by number
{
// if not exist call the customer retrieve API to get the customer details
String response = customerClient.getCustomerRetrieveApi(number); // api call
dbService.persistCustomerDetails(response) // save customer details in db
}
}
// My CustomerRetrieveApi call using webclient
webClient.post()
.uri(urlString)
.contentType(mediaType)
.bodyValue(requestObject)
.retrieve()
.bodyToMono(responseClass)
.onErrorResume(WebClientResponseException.class,
ex -> ex.getRawStatusCode() == 404 ? Mono.empty() : Mono.error(ex))
.block();
列表中将有 1000 个号码。如何使用执行器和通量并行执行它?哪种方法与代码实现更好?
我尝试使用@async方法和执行器,但它仅在单线程中运行。我需要至少 10 个线程(10 个数字)并行运行。
示例:在 for 循环中,1 个数字需要 1 秒才能完成该过程,因此 1000 个数字意味着 1000 秒,但我希望当批次为 10 时程序在 100 秒内执行。
为了让 Reactor 优化任务调度,你必须避免诉诸
block
操作。一般来说,你应该尽量避免阻塞,因为Reactor的主要兴趣之一是提供“非阻塞”管道(提供阻塞是为了与外部代码/api兼容)。
就您而言,您可以:
getCustomerRetrieveApi()
返回 Mono,而不是阻塞
public Mono<ResponseClass> getCustomerRetrieveApi() {
webClient.post()
.uri(urlString)
.contentType(mediaType)
.bodyValue(requestObject)
.retrieve()
.bodyToMono(responseClass)
.onErrorResume(WebClientResponseException.class,
ex -> ex.getRawStatusCode() == 404 ? Mono.empty() : Mono.error(ex));
}
Flux.fromIterable(numberList)
和 flatMap
使主循环具有反应性:
/**
* @return count of updated details.
*/
public Mono<Long> updateCustomerDetailsInDB(List<String> numberList) {
return Flux.fromIterable(numberList)
.flatMap(number -> {
if (isExistsBy(number)) return Mono.just(0L);
return customerClient.getCustomerRetrieveApi(number)
.map(response -> {
dbService.persistCustomerDetails(response);
return 1L;
})
.defaultIfEmpty(0L);
})
.sum();
}
现在这可能还不够,具体取决于
isExistBy
和 persistCustomDetails
的实现。但这是让 Reactor 将工作流程正确转换为非阻塞并发管道的第一个必要步骤。