Spring Flux Stream HeartBeat 异步异常

问题描述 投票:0回答:1

我有一个看起来像这样的控制器:

@RestController
public class MyController {

    private final Flux<T> flux1;
    private final Flux<T> flux2;

    @GetMapping(produces = APPLICATION_NDJSON_VALUE)
    public Flux<T> subscribe() {

        var mergedFlux = Flux.merge(this.flux1, this.flux2).doOnCancel(() -> {
            log.debug("canceled");
        }).doOnError(e -> {
            log.error("error", e);
        });

        return keepAlive(Duration.of(10, ChronoUnit.SECONDS);, mergedFlux);
    }

    private Flux<T> keepAlive(Duration interval, Flux<T> flux) {
        Flux<HeartBeat> heartBeat = Flux.interval(interval)
                .map(t -> new HeartBeat()).doFinally(signalType -> log.debug("HearBeat 
closed"));
        return Flux.merge(flux, heartBeat);
    }
}

问题是,每当我关闭客户端的连接并且发送下一个心跳时,我都会收到以下异常:

Exception in thread "AsyncExecutor-1" java.lang.IllegalStateException: A non-container (application) thread attempted to use the AsyncContext after an error had occurred and the call to AsyncListener.onError() had returned. This is not allowed to avoid race conditions

我的配置如下:

@Configuration
public class Config implements WebMvcConfigurer {

@Override
public void configureAsyncSupport(AsyncSupportConfigurer configurer) {
    configurer.setTaskExecutor(poolTaskExecutor());
}

@Bean(name = "AsyncExecutor")
public AsyncTaskExecutor poolTaskExecutor() {
    ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
    executor.setCorePoolSize(1);
    executor.setMaxPoolSize(2);
    executor.setThreadNamePrefix("AsyncExecutor-");
    executor.initialize();
    return executor;
}

只有当我使用

keepAlive
方法在中间发送心跳以使连接保持打开状态并且不会遇到超时时,才会发生这种情况。

我怎样才能防止这种情况发生?

我使用的是 Spring Web MVC,而不是 Spring Webflux。

java spring spring-boot spring-mvc spring-webflux
1个回答
0
投票

我建议使用 onErrorResumeonErrorReturn 当在此 Flux 上观察到指定类型的错误时发出捕获的后备值。这样,如果发生错误,您可以返回回退值或空流,从而防止可能导致异常的进一步处理。

var mergedFlux = Flux.merge(this.flux1, this.flux2)
     .doOnCancel(() -> log.debug("canceled"))
     .doOnError(e -> log.error("error", e))
     .onErrorResume(e -> Flux.empty());

还要确保保持活动机制不会在原始

Flux
完成或出错后尝试发出心跳。您可能需要使用 takeUntil 或类似的运算符来控制心跳的生命周期。

private Flux<T> keepAlive(Duration interval, Flux<T> flux) {
    Flux<HeartBeat> heartBeat = Flux.interval(interval)
            .map(t -> new HeartBeat())
            .doFinally(signalType -> log.debug("HeartBeat closed"));
    return Flux.merge(flux, heartBeat).takeUntil(flux);
}
© www.soinside.com 2019 - 2024. All rights reserved.