异步流中出站网关的错误处理

问题描述 投票:2回答:2

我有这样的集成流程:

@Bean
public IntegrationFlow inboundRequestFlow()
{
    return IntegrationFlows.from( inboundRequestGateway() )
        .log( ... )
        .filter(  requestValidator,
            spec -> spec.discardChannel( invalidRequestChannel() ) )
        .bridge( spec -> spec.requiresReply( false ) )
        .channel( asyncFlowChannel )
        .wireTap(
            //sends a NO_CONTENT reply if the request is ok
            flow -> flow.enrichHeaders( spec -> spec.header( HttpHeaders.STATUS_CODE, HttpStatus.NO_CONTENT ).defaultOverwrite( true ) )
                .transform( payload -> "" )
                .channel( inboundGatewayReplyChannel() )
        ).get();

}

它在http网关上接收请求,验证它,如果一切正常,则将请求发送到'asyncFlowChannel'并用204回复入站网关。

'asyncFlowChannel'是在Executor通道上运行的另一个IntegrationFlow的起点:

@Bean
public IntegrationFlow outboundFlow()
{
    return IntegrationFlows.from( asyncFlowChannel)
        .log( ... )
        .transform( ... )
        .transform( ... )
        .split(... )            
        .resequence( ... )
        .enrichHeaders( ... )
        .log( ... )
        .transform( ... )
        .handle( this.outboundSOAPGateway() )
        .log( .. )
        .handle( ... )
        .bridge( spec -> spec.requiresReply( false ) )
        .channel( anotherAsyncFlowChannel )
        .get();
}

如果我的outboundGateway发生异常(由于与网络相关的IO错误或错误响应),我想记录错误并采取适当的措施。但我无法在outboundSOAPGateway上设置错误通道,并且启动流程上的inboundRequestGateway已经收到了它的回复。

我得到的唯一错误就是这个日志:

10:19:53.002 WARN [outbound-flow-0] org.springframework.messaging.core.GenericMessagingTemplate $ TemporaryReplyChannel - 收到回复消息但接收线程已收到回复:ErrorMessage [payload = ...,headers = .. ]

我的问题是:在异步流中处理出站网关错误的正确方法是什么,其中启动流的inboundGateway已收到它的回复?

java spring-integration
2个回答
3
投票

任何MessageHandler终点都可以与AbstractRequestHandlerAdvice一起提供。其中一个是ExpressionEvaluatingRequestHandlerAdvice,在那里你可以捕获异常并将其发送到failureChannelhttps://docs.spring.io/spring-integration/docs/5.0.0.RELEASE/reference/html/messaging-endpoints-chapter.html#expression-advice

为此目的,可以为.handle( this.outboundSOAPGateway() )提供第二个参数,例如:

.handle((GenericHandler<?>) (p, h) -> {
                    throw new RuntimeException("intentional");
                }, e -> e.advice(retryAdvice()))

在这种情况下,我使用

@Bean
public RequestHandlerRetryAdvice retryAdvice() {
    RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
    requestHandlerRetryAdvice.setRecoveryCallback(new ErrorMessageSendingRecoverer(recoveryChannel()));
    return requestHandlerRetryAdvice;
}

但同样适用于ExpressionEvaluatingRequestHandlerAdvice

顺便说一句,retryAdvice()也可能为你做这个伎俩。看看它的ErrorMessageSendingRecoverer


1
投票

而不是.channel( asyncFlowChannel ),使用

.gateway(asyncFlowChannel, e -> e.errorChannel(...))
© www.soinside.com 2019 - 2024. All rights reserved.