我正在尝试使用 Spring Boot 框架将消息发布到 GCP pubsub 主题。偶尔会遇到错误,提示托管通道未关闭,但我不确定到底需要在哪里关闭它。
GCPPublisherConfig.java
@Configuration
@RequiredArgsConstructor
public class GCPPublisherConfig {
private final AppProperties appProperties;
@Bean
public PublisherFactory publisherFactory(CredentialsProvider defaultCredentialsProvider) {
DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> appProperties.getPubsubProjectId());
factory.setEnableMessageOrdering(true);
factory.setCredentialsProvider(defaultCredentialsProvider);
return factory;
}
@Bean
public PubSubTemplate pubSubTemplate(
PublisherFactory publisherFactory, SubscriberFactory subscriberFactory) {
return new PubSubTemplate(publisherFactory, subscriberFactory);
}
}
MessagePublisher.java
@RequiredArgsConstructor
@Component
public class MessagePublisher {
private final PubSubTemplate pubSubTemplate;
private final AppProperties appProperties;
private final Gson gson;
public Mono<String> publishMessageToGcpTopic(Message message, String orderingKey){
log.info("publishing message to pubsub topic");
PubsubMessage msgToPublish = PubsubMessage.newBuilder()
.setData(ByteString.copyFromUtf8(gson.toJson(message)))
.setOrderingKey(orderingKey)
.build();
return Mono.fromFuture(
this.pubSubTemplate
.publish(appProperties.getPubsubTopicId(), msgToPublish)
.exceptionally(ex -> {
log.error("error when publishing messaging to gcp", ex);
return "0";
}));
}
}
AppService.java
@Override
public Flux<Integer> getData(){
// this is not the actual service logic. I have created this sample logic to reproduce the error.
return Flux.range(1, 1000)
.map(it -> {
log.info("number - {}", it);
return it;
});
};
AppController.java
@GetMapping("/publish/messages")
public Mono<Boolean> testMessages(){
return appService.getData()
.flatMap(it -> {
EdslMessage edslMessage = EdslMessage.builder().build();
messagePublisher.publishMessageToGcpTopic(edslMessage, it);
return Mono.just(true);
})
.switchIfEmpty(Mono.defer(() -> {
log.warn("No entries found for date less than current date");
return Mono.just(false);
}))
.reduce((a, b) -> a && b);
}
错误堆栈跟踪
[ctor-http-nio-3] i.g.i.ManagedChannelOrphanWrapper : *~*~*~ Previous channel ManagedChannelImpl{logId=571, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.
java.lang.RuntimeException: ManagedChannel allocation site
at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:631)
at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:297)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:391)
at com.google.api.gax.grpc.ChannelPool.<init>(ChannelPool.java:107)
at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:85)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:237)
at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:231)
at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:236)
at com.google.cloud.pubsub.v1.stub.GrpcPublisherStub.create(GrpcPublisherStub.java:203)
at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:201)
at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:91)
at com.google.cloud.pubsub.v1.Publisher$Builder.build(Publisher.java:881)
at com.google.cloud.spring.pubsub.support.DefaultPublisherFactory.createPublisher(DefaultPublisherFactory.java:186)
at com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate.publish(PubSubPublisherTemplate.java:94)
at com.google.cloud.spring.pubsub.core.PubSubTemplate.publish(PubSubTemplate.java:120)
at org.ascension.swe.myaccount.careteambuilder.replay.MessagePublisher.publishMessageToGcpTopic(MessagePublisher.java:33)
at org.ascension.swe.myaccount.careteambuilder.replay.TestController.lambda$testMessages$1(TestController.java:54)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83)
at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57)
at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:293)
at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:474)
at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:651)
at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:276)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
at java.base/java.lang.Thread.run(Thread.java:833)
当不再需要 ManagedChannel 实例时,您必须显式关闭它们。关闭释放资源的渠道。
使用 Spring Cloud GCP 与 Pub/Sub 配合使用时,您可以在不再需要时显式关闭 PubSubTemplate 和关联通道。您可以在 MessagePublisher bean 中定义一个 @PreDestroy 方法来处理关闭,代码如下:
@PreDestroy
public void shutdown() {
log.info("Shutting down PubSubTemplate and its channels...");
pubSubTemplate.getPublisherFactory().shutdown();
}
@PreDestroy 方法在 Spring 容器关闭时被调用,可用于执行关闭资源等清理任务。