我有一个看起来像这样的控制器:
@RestController
public class MyController {
private final Flux<T> flux1;
private final Flux<T> flux2;
@GetMapping(produces = APPLICATION_NDJSON_VALUE)
public Flux<T> subscribe() {
var mergedFlux = Flux.merge(this.flux1, this.flux2).doOnCancel(() -> {
log.debug("canceled");
}).doOnError(e -> {
log.error("error", e);
});
return keepAlive(Duration.of(10, ChronoUnit.SECONDS);, mergedFlux);
}
private Flux<T> keepAlive(Duration interval, Flux<T> flux) {
Flux<HeartBeat> heartBeat = Flux.interval(interval)
.map(t -> new HeartBeat()).doFinally(signalType -> log.debug("HearBeat
closed"));
return Flux.merge(flux, heartBeat);
}
}
问题是,每当我关闭客户端的连接并且发送下一个心跳时,我都会收到以下异常:
Exception in thread "AsyncExecutor-1" java.lang.IllegalStateException: A non-container (application) thread attempted to use the AsyncContext after an error had occurred and the call to AsyncListener.onError() had returned. This is not allowed to avoid race conditions
我的配置如下:
@Configuration
public class Config implements WebMvcConfigurer {
@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
configurer.setTaskExecutor(poolTaskExecutor());
}
@Bean(name = "AsyncExecutor")
public AsyncTaskExecutor poolTaskExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(1);
executor.setMaxPoolSize(2);
executor.setThreadNamePrefix("AsyncExecutor-");
executor.initialize();
return executor;
}
只有当我使用
keepAlive
方法在中间发送心跳以使连接保持打开状态并且不会遇到超时时,才会发生这种情况。
我怎样才能防止这种情况发生?
我使用的是 Spring Web MVC,而不是 Spring Webflux。
我建议使用 onErrorResume 或 onErrorReturn 当在此 Flux 上观察到指定类型的错误时发出捕获的后备值。这样,如果发生错误,您可以返回回退值或空流,从而防止可能导致异常的进一步处理。
var mergedFlux = Flux.merge(this.flux1, this.flux2)
.doOnCancel(() -> log.debug("canceled"))
.doOnError(e -> log.error("error", e))
.onErrorResume(e -> Flux.empty());
还要确保保持活动机制不会在原始
Flux
完成或出错后尝试发出心跳。您可能需要使用 takeUntil 或类似的运算符来控制心跳的生命周期。
private Flux<T> keepAlive(Duration interval, Flux<T> flux) {
Flux<HeartBeat> heartBeat = Flux.interval(interval)
.map(t -> new HeartBeat())
.doFinally(signalType -> log.debug("HeartBeat closed"));
return Flux.merge(flux, heartBeat).takeUntil(flux);
}