我正在使用 RxJava 2 通过 micronaut 框架进行反应式编程。我试图理解以下代码的问题:
fun getItemDetails(
itemRequest: RequestTo
): Mono<List<ResponseTo>> {
return itemRequest.itemIdList.toFlux()
.flatMap { findItemStatus(it) }
.flatMap { response ->
Mono.justOrEmpty(ResponseTo(response))
.onErrorResume {
logger.error("Error occurred while mapping entity to response: $response", it)
Mono.empty()
}
}
.collectList()
}
RequestTo:由一个String列表组成。 这个想法是迭代这个列表,从数据库中获取每个项目的详细信息(通过返回数据库实体的
findItemStatus()
函数调用处理)并将其映射到响应对象。
我想这样工作,如果
findItemStatus() flatMap
操作返回的任何实体在将其映射到 ResponseTo
时导致异常,我希望在最终列表中跳过该项目但继续处理通量中的剩余项目.我不想将 onErrorResume
块保留在第二个 flatMap 之外,因为如果通量中的任何中间实体在映射时导致错误并且通量中的其余项目将不会被映射,这将关闭流。
我不确定我的代码有什么问题,但是当
Mono.justOrEmpty(ResponseTo(response))
中抛出异常时,它不会在 onErrorResume 中被捕获。
我把测试用例放在这里:
def "test getItemDetails() when there is an error mapping DB response to Response TO for any item"() {
def itemEntity1 = new DiscontinuedItemsEntity("52782904", null, null, UUID.randomUUID())
def itemEntity2 = new DiscontinuedItemsEntity("52778904", null, null, UUID.randomUUID())
def requestBody = new DiscontinuedItemsTcinRequest(["52782904", "52782904"])
when:
def response = itemsStatusService.getItemDetails(requestBody).block()
def responseList = response.itemsList
then:
1 * discontinuedItemsRepository.findDiscontinuedItems(*_) >> Mono.just(itemEntity1)
1 * discontinuedItemsRepository.findDiscontinuedItems(*_) >> Mono.just(itemEntity2)
responseList.size() == 1
responseList.get(0).itemId == itemEntity1.itemId
}
我需要这个测试用例通过,但是异常被抛出并且没有被处理,因为它没有被 onErrorResume 捕获