我正在编写一个拦截器来验证 grpc 服务器中的消息,但我遇到了问题。
一般方案是:拦截器在应用程序级别激活并验证传入的原始消息。如果未通过验证,我需要将异常进一步转发到应用程序中,以便异常处理程序正确地将响应构建为 StatusRuntime 异常并将其返回给客户端。这是一些示例代码,它可以工作:
@Slf4j
@GrpcGlobalServerInterceptor
public class ValidationInterceptor implements ServerInterceptor {
private static final String UNKNOWN_CALL_NAME = "Unknown call";
private final GrpcExceptionResponseHandler exceptionHandler;
private final Validator validator;
public ValidationInterceptor(GrpcExceptionResponseHandler exceptionHandler,
Validator validator) {
this.exceptionHandler = exceptionHandler;
this.validator = validator;
}
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next) {
final Listener<ReqT> delegateListener = next.startCall(call, headers);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<>(delegateListener) {
@Override
public void onMessage(ReqT message) {
try {
final ValidationResult validationResult = validator.validate((Message) message);
if (!validationResult.isSuccess()) {
exceptionHandler.handleError(call,
new ValidationRuntimeException(
getMethodName(call), validationResult.getViolations())
);
return;
}
super.onMessage(message);
} catch (Exception e) {
log.warn("Unexpected exception caught when validating method {}", getMethodName(call), e);
exceptionHandler.handleError(call, e);
}
}
};
}
private <ReqT, RespT> String getMethodName(ServerCall<ReqT, RespT> call) {
return Optional.ofNullable(call.getMethodDescriptor())
.map(MethodDescriptor::getFullMethodName)
.orElse(UNKNOWN_CALL_NAME);
}
}
处理程序
@GrpcAdvice
@Slf4j
public class GrpcErrorHandler {
@GrpcExceptionHandler
public StatusRuntimeException handleValidationRuntimeException(
ValidationRuntimeException e) {
String message = Optional.ofNullable(e.getMessage()).orElse(DEFAULT_MESSAGE);
log.warn("Error {}", message, e);
errorCounterMetrics.incrementError(e);
return toStatusRuntimeException(e.getViolations());
}
}
除了一个细节之外,一切都很好。在
handleValidationRuntimeException()
方法处理并返回异常后,由于某种原因,在已经关闭的通道上再次调用 call()
,这会导致 IllegalStateException
堆栈跟踪
java.lang.IllegalStateException: call already closed
at com.google.common.base.Preconditions.checkState(Preconditions.java:502)
at io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:216)
at io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:209)
at io.grpc.PartialForwardingServerCall.close(PartialForwardingServerCall.java:48)
at io.grpc.ForwardingServerCall.close(ForwardingServerCall.java:22)
at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.close(ForwardingServerCall.java:39)
at net.devh.boot.grpc.server.metric.MetricCollectingServerCall.close(MetricCollectingServerCall.java:60)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleThrownExceptionByImplementation(GrpcAdviceExceptionHandler.java:112)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleError(GrpcAdviceExceptionHandler.java:82)
at net.devh.boot.grpc.server.error.GrpcExceptionListener.onHalfClose(GrpcExceptionListener.java:72)
at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at net.devh.boot.grpc.server.security.interceptors.AbstractAuthenticatingServerCallListener.onHalfClose(AbstractAuthenticatingServerCallListener.java:103)
at net.devh.boot.grpc.server.security.interceptors.DefaultAuthenticatingServerInterceptor$AuthenticatingServerCallListener.onHalfClose(DefaultAuthenticatingServerInterceptor.java:243)
at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at net.devh.boot.grpc.server.security.interceptors.ExceptionTranslatingServerInterceptor$ExceptionTranslatorServerCallListener.onHalfClose(ExceptionTranslatingServerInterceptor.java:124)
at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at io.grpc.PartialForwardingServerCallListener.onHalfClose(PartialForwardingServerCallListener.java:35)
at io.grpc.ForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:23)
at io.grpc.ForwardingServerCallListener$SimpleForwardingServerCallListener.onHalfClose(ForwardingServerCallListener.java:40)
at io.grpc.Contexts$ContextualizedServerCallListener.onHalfClose(Contexts.java:86)
at io.grpc.internal.ServerCallImpl$ServerStreamListenerImpl.halfClosed(ServerCallImpl.java:352)
at io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed.runInContext(ServerImpl.java:866)
at io.grpc.internal.ContextRunnable.run(ContextRunnable.java:37)
at io.grpc.internal.SerializingExecutor.run(SerializingExecutor.java:133)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.lang.Thread.run(Thread.java:833)
其实,对此能做什么呢?谢谢!
如果您查看堆栈跟踪的前几行,您会发现 ServerCall 也由 MetricCollectingServerCall.close 显式关闭 这是从 GrpcAdviceExceptionHandler.handleThrownExceptionByImplementation 调用的
at com.google.common.base.Preconditions.checkState(Preconditions.java:502)
at io.grpc.internal.ServerCallImpl.closeInternal(ServerCallImpl.java:216)
at io.grpc.internal.ServerCallImpl.close(ServerCallImpl.java:209)
at io.grpc.PartialForwardingServerCall.close(PartialForwardingServerCall.java:48)
at io.grpc.ForwardingServerCall.close(ForwardingServerCall.java:22)
at io.grpc.ForwardingServerCall$SimpleForwardingServerCall.close(ForwardingServerCall.java:39)
at net.devh.boot.grpc.server.metric.MetricCollectingServerCall.close(MetricCollectingServerCall.java:60)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleThrownExceptionByImplementation(GrpcAdviceExceptionHandler.java:112)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleError(GrpcAdviceExceptionHandler.java:82)
at net.devh.boot.grpc.server.error.GrpcExceptionListener.onHalfClose(GrpcExceptionListener.java:72)
at net.devh.boot.grpc.server.advice.GrpcAdviceExceptionHandler.handleError(GrpcAdviceExceptionHandler.java:82)