如何在 Spring Boot Webflux 中进行顺序 API 调用和合并结果

问题描述 投票:0回答:0

我已经有一个反应流可以执行以下操作:

1.通过调用另一个 REST API 检索包裹 ID 列表

2.通过调用另一个 REST API 检索每个包裹 ID 的跟踪事件列表

3.返回结合上述两个响应的单声道列表

我的新要求是集成另一个 Web 请求,该请求必须使用来自两个 API 调用的上述组合响应。 因此,新调用将不得不使用现有数据来构建请求主体。 我的问题是是否可以在同一个反应流中做到这一点?或者我是否必须阻止当前的实现并获得响应然后执行新的网络调用?

当前实施:

public Mono<List<ParcelInfo>> parcelEventsForReference(String reference, Postcode postcode) {
    return connector.resolveForExternalReference(reference)
        .flatMapIterable(ResolvedParcelIds::getParcelIds)
        .flatMap(id -> parcelEventsFor(id)
            .map(parcel -> redactIfNecessary(parcel, id, postcode)))
        .collectList()
        .doOnError(throwable -> log.error(EXTERNAL_SERVICE_ERROR, throwable.getMessage()));
}

private Mono<ParcelInfo> parcelEventsFor(ResolvedParcelId id) {
    return gateway.getParcelBy(uid).zipWith(gateway.getTrackEventsBy(uid)) //
            .map(tuple -> combineParcelEvents(tuple.getT1(), tuple.getT2()));
}

private ParcelInfo redactIfNecessary(ParcelInfo parcel, ResolvedParcelId id, Postcode postcode) {
    boolean fullResponse = ResolvedParcelId.isReturn(parcel.getBarcode()) || uid.isReturn() || parcel.hasPostcode(postcode);
    return fullResponse ? parcel : redact(parcel);
}

我正在尝试与上面集成的新网络请求:

  public Mono<AvailableDiversionsResponse> getAvailableDiversionsForParcel(final int parcelId, 
                            final TrackingEventsRequest trackingEventsRequest) {
    return webclient
        .method(HttpMethod.GET)
        .uri("available/{parcelId}", parcelId)
        .body(Mono.just(trackingEventsRequest), TrackingEventsRequest.class)
        .accept(MediaType.APPLICATION_JSON)
        .retrieve()
        .onStatus(HttpStatus.NOT_FOUND::equals, r -> Mono.empty())
        .onStatus(HttpStatus::isError, response -> Mono.error(WebClientResponseException.create(
                      response.statusCode().value(),
                      response.statusCode().getReasonPhrase(),
                      response.headers().asHttpHeaders(),
                      null,
                      null
                  ))
        )
        .bodyToMono(AvailableDiversionsResponse.class)
        .doOnError(throwable -> log.error(EXTERNAL_SERVICE_ERROR, throwable.getMessage()));
}

TrackingEventsRequest
应由
parcelEventsForReference
的当前响应填充,这是第一个 API 调用。然后应该合并两个呼叫的响应,使
availableDiversions
设置为
Mono<ParcelInfo>
.

包裹信息类:

@Builder
@Data
public class ParcelInfo {

 private String barcode;
 private List<String> availableDiversions;

}

AvailableDiversionsResponse 类:

@Builder
@Data
public class AvailableDiversionsResponse {

 private List<String> availableDiversions;

}
java spring-webflux reactive-programming project-reactor
© www.soinside.com 2019 - 2024. All rights reserved.