处理异常时关闭 grps 连接

问题描述 投票:0回答:1

我正在编写一个拦截器来验证 grpc 服务器中的消息,但我遇到了问题。

一般方案是:拦截器在应用程序级别激活并验证传入的原始消息。如果未通过验证,我需要将异常进一步转发到应用程序中,以便异常处理程序正确地将响应构建为 StatusRuntime 异常并将其返回给客户端。这是一些示例代码,它可以工作:

@Slf4j
@GrpcGlobalServerInterceptor
public class ValidationInterceptor implements ServerInterceptor {
  private static final String UNKNOWN_CALL_NAME = "Unknown call";
  private final GrpcExceptionResponseHandler exceptionHandler;
  private final Validator validator;

  public ValidationInterceptor(GrpcExceptionResponseHandler exceptionHandler,
                               Validator validator) {
    this.exceptionHandler = exceptionHandler;
    this.validator = validator;
  }

  @Override
  public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
                                                    Metadata headers,
                                                    ServerCallHandler<ReqT, RespT> next) {
    final Listener<ReqT> delegateListener = next.startCall(call, headers);
    return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(delegateListener) {
      @Override
      public void onMessage(ReqT message) {
        try {
          final ValidationResult validationResult = validator.validate((Message) message);

          if (!validationResult.isSuccess()) {
            exceptionHandler.handleError(call,
                new ValidationRuntimeException(
                    getMethodName(call), validationResult.getViolations())
            );
            return;
          }

          super.onMessage(message);
        } catch (Exception e) {
          log.warn("Unexpected exception caught when validating method {}", getMethodName(call), e);
          exceptionHandler.handleError(call, e);
        }
      }
    };
  }

  private <ReqT, RespT> String getMethodName(ServerCall<ReqT, RespT> call) {
    return Optional.ofNullable(call.getMethodDescriptor())
        .map(MethodDescriptor::getFullMethodName)
        .orElse(UNKNOWN_CALL_NAME);
  }
}

处理程序

@GrpcAdvice
@Slf4j
public class GrpcErrorHandler {
  @GrpcExceptionHandler
  public StatusRuntimeException handleValidationRuntimeException(
      ValidationRuntimeException e) {
    String message = Optional.ofNullable(e.getMessage()).orElse(DEFAULT_MESSAGE);
    log.warn("Error {}", message, e);
    errorCounterMetrics.incrementError(e);
    return toStatusRuntimeException(e.getViolations());
  }
}

除了一个细节之外,一切都很好。在

handleValidationRuntimeException()
方法处理并返回异常后,由于某种原因,在已经关闭的通道上再次调用
call()
,这会导致
IllegalStateException

堆栈跟踪

java.lang.IllegalStateException: call already closed
    at com.google.common.base.Preconditions.checkState(Preconditions.java:502)
    at io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:216)
    at io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:209)
    at io.grpc.PartialForwardingServerCall.close(PartialForwardingServerCall.java:48)
    at io.grpc.ForwardingServerCall.close(ForwardingServerCall.java:22)
    at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.close(ForwardingServerCall.java:39)
    at net.devh.boot.grpc.server.metric.MetricCollectingServerCall.close(MetricCollectingServerCall.java:60)
    at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleThrownExceptionByImplementation(GrpcAdviceExceptionHandler.java:112)
    at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleError(GrpcAdviceExceptionHandler.java:82)
    at net.devh.boot.grpc.server.error.GrpcExceptionListener.onHalfClose(GrpcExceptionListener.java:72)
    at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at net.devh.boot.grpc.server.security.interceptors.AbstractAuthenticatingServerCallListener.onHalfClose(AbstractAuthenticatingServerCallListener.java:103)
    at net.devh.boot.grpc.server.security.interceptors.DefaultAuthenticatingServerInterceptor$AuthenticatingServerCallListener.onHalfClose(DefaultAuthenticatingServerInterceptor.java:243)
    at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at net.devh.boot.grpc.server.security.interceptors.ExceptionTranslatingServerInterceptor$ExceptionTranslatorServerCallListener.onHalfClose(ExceptionTranslatingServerInterceptor.java:124)
    at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
    at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
    at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
    at io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
    at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
    at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
    at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
    at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
    at java.lang.Thread.run(Thread.java:833)

其实,对此能做什么呢?谢谢!

java spring-boot grpc illegalstateexception grpc-java
1个回答
0
投票

如果您查看堆栈跟踪的前几行,您会发现 ServerCall 也由 MetricCollectingServerCall.close 显式关闭 这是从 GrpcAdviceExceptionHandler.handleThrownExceptionByImplementation 调用的

at com.google.common.base.Preconditions.checkState(Preconditions.java:502)
at io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:216)
at io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:209)
at io.grpc.PartialForwardingServerCall.close(PartialForwardingServerCall.java:48)
at io.grpc.ForwardingServerCall.close(ForwardingServerCall.java:22)
at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.close(ForwardingServerCall.java:39)
at net.devh.boot.grpc.server.metric.MetricCollectingServerCall.close(MetricCollectingServerCall.java:60)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleThrownExceptionByImplementation(GrpcAdviceExceptionHandler.java:112)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleError(GrpcAdviceExceptionHandler.java:82)
at net.devh.boot.grpc.server.error.GrpcExceptionListener.onHalfClose(GrpcExceptionListener.java:72)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleError(GrpcAdviceExceptionHandler.java:82)
最新问题
© www.soinside.com 2019 - 2024. All rights reserved.