如何使用 SSE 发射器跟踪异步线程并处理损坏的管道错误?

问题描述 投票:0回答:1

我希望在异步线程将结果分派到 Tomcat 之前捕获由损坏的管道引起的

IOException
。本质上,在我发现问题之前,客户端会断开连接,并且错误会传至 Tomcat。我无法控制客户。

我的控制器中有一个

Executors.newCachedThreadPool()
,我的端点如下所示:

    @GetMapping("/mySSEStream")
    public SseEmitter sseEmitter() {
        SseEmitter emitter = new SseEmitter(-1L);
        MyRunner streamingRunner = new MyRunner(emitter);
        cachedThreadPool.execute(streamingRunner);
        return emitter;
    }

我的跑步者:

public class MyRunner implements Runnable {
    private final SseEmitter sseEmitter;

    public StreamingRunner(SseEmitter sseEmitter) {
        this.sseEmitter = sseEmitter;
    }

    @Override
    public void run() {
        try {
            sendData();
        } catch (IOException ioException) {
            sseEmitter.completeWithError(ioException);
        } finally {
            try {
                sseEmitter.complete();
            } catch (IllegalStateException illegalStateException) {
                log.debug("SSE Emitter already closed...");
            }
        }
    }

因此控制器线程创建 SSE 发射器,第二个线程(名为

http-nio-9998-exec-1
)接受请求,返回 200,并开始发射数据。然后我收到一条调试日志,指出 Dispatcher Servlet 是“正在退出,但响应仍保持打开状态以供进一步处理”。发生客户端断开连接。然后第三个线程(名为
http-nio-9998-exec-2
)开始处理破损管道错误。我收到一条消息
WebAsyncManager - Async error, dispatch to /mySSEStream
。然而,无论我如何尝试捕获错误,它都会向上冒泡到 Tomcat 并以 500 错误响应。我在调试器中看到了这一点。显然 500 不会到达客户端,但我们的端点指标充斥着 500 错误。

我在任何地方都找不到这个错误。我尝试过使用

AsyncHandlerInterceptor
,但没有成功。建议使用
WebAsyncManager
来注册延迟结果,但我无法正常工作。我尝试过 @ControllerAdvice 和 @ExceptionHandler,但该线程直接与 Tomcat 容器对话。我也尝试过 Filter,但这不适用于出站消息,它只能过滤来自 Tomcat 的传入消息。

我确信我在这里遗漏了一些东西,非常感谢任何帮助。

谢谢。

java multithreading spring-boot asynchronous tomcat
1个回答
0
投票

可能不是您正在寻找的答案,但经过许多毫无结果的时间后,这是我想出的解决方案。我扩展了 logback 配置以不记录这些错误。我在记录错误本身的

ApplicationDispatcher
中找到了记录器名称。

public class SseBrokenPipeLogFilter extends Filter<ILoggingEvent> {

@Override
public FilterReply decide(ILoggingEvent iLoggingEvent) {
    var throwable = iLoggingEvent.getThrowableProxy();
    if (throwable != null
            && throwable.getClassName().equals("java.io.IOException")
            && throwable.getMessage().equals("Broken pipe")
            && iLoggingEvent.getLoggerName().contains("org.apache.catalina.core.ContainerBase")) {
        return FilterReply.DENY;
    }
    return FilterReply.NEUTRAL;
}

}

这是

logback-spring.xml
文件,它只是带有额外过滤器的 Spring Boot 默认值。

<?xml version="1.0" encoding="UTF-8"?>
<configuration>
    <include resource="org/springframework/boot/logging/logback/defaults.xml"/>
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>${CONSOLE_LOG_THRESHOLD}</level>
        </filter>
        <filter class="cz.okd.okdbe.configuration.SseBrokenPipeLogFilter" />
        <encoder>
            <pattern>${CONSOLE_LOG_PATTERN}</pattern>
            <charset>${CONSOLE_LOG_CHARSET}</charset>
        </encoder>
    </appender>

    <root level="INFO">
        <appender-ref ref="CONSOLE" />
    </root>
</configuration>
© www.soinside.com 2019 - 2024. All rights reserved.