尝试向 GCP Pubsub 主题发布多条消息时如何修复“上一个频道未正确关闭错误”

问题描述 投票:0回答:1

我正在尝试使用 Spring Boot 框架将消息发布到 GCP pubsub 主题。偶尔会遇到错误,提示托管通道未关闭,但我不确定到底需要在哪里关闭它。

GCPPublisherConfig.java

@Configuration
@RequiredArgsConstructor
public class GCPPublisherConfig {

    private final AppProperties appProperties;

    @Bean
    public PublisherFactory publisherFactory(CredentialsProvider defaultCredentialsProvider) {
        DefaultPublisherFactory factory = new DefaultPublisherFactory(() -> appProperties.getPubsubProjectId());
        factory.setEnableMessageOrdering(true);
        factory.setCredentialsProvider(defaultCredentialsProvider);
        return factory;
    }

    @Bean
    public PubSubTemplate pubSubTemplate(
            PublisherFactory publisherFactory, SubscriberFactory subscriberFactory) {
        return new PubSubTemplate(publisherFactory, subscriberFactory);
    }

}

MessagePublisher.java

@RequiredArgsConstructor
@Component
public class MessagePublisher {

    private final PubSubTemplate pubSubTemplate;
    private final AppProperties appProperties;
    private final Gson gson;

    public Mono<String> publishMessageToGcpTopic(Message message, String orderingKey){
        log.info("publishing message to pubsub topic");

        PubsubMessage msgToPublish = PubsubMessage.newBuilder()
                .setData(ByteString.copyFromUtf8(gson.toJson(message)))
                .setOrderingKey(orderingKey)
                .build();

        return Mono.fromFuture(
                this.pubSubTemplate
                .publish(appProperties.getPubsubTopicId(), msgToPublish)
                .exceptionally(ex -> {
                    log.error("error when publishing messaging to gcp", ex);
                    return "0";
                }));
    }
}

AppService.java

@Override
    public Flux<Integer> getData(){
      // this is not the actual service logic. I have created this sample logic to reproduce the error.
           return Flux.range(1, 1000)
                    .map(it -> {
                        log.info("number - {}", it);
                        return it;
                    });
    };

AppController.java

@GetMapping("/publish/messages")
    public Mono<Boolean> testMessages(){

        return appService.getData()
                .flatMap(it -> {
                    EdslMessage edslMessage = EdslMessage.builder().build();
                    messagePublisher.publishMessageToGcpTopic(edslMessage, it);
                    return Mono.just(true);
                })
                .switchIfEmpty(Mono.defer(() -> {
                    log.warn("No entries found for date less than current date");
                    return Mono.just(false);
               }))
                .reduce((a, b) -> a && b);
}

错误堆栈跟踪

[ctor-http-nio-3] i.g.i.ManagedChannelOrphanWrapper        : *~*~*~ Previous channel ManagedChannelImpl{logId=571, target=pubsub.googleapis.com:443} was not shutdown properly!!! ~*~*~*
    Make sure to call shutdown()/shutdownNow() and wait until awaitTermination() returns true.

java.lang.RuntimeException: ManagedChannel allocation site
    at io.grpc.internal.ManagedChannelOrphanWrapper$ManagedChannelReference.<init>(ManagedChannelOrphanWrapper.java:102)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:60)
    at io.grpc.internal.ManagedChannelOrphanWrapper.<init>(ManagedChannelOrphanWrapper.java:51)
    at io.grpc.internal.ManagedChannelImplBuilder.build(ManagedChannelImplBuilder.java:631)
    at io.grpc.internal.AbstractManagedChannelImplBuilder.build(AbstractManagedChannelImplBuilder.java:297)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createSingleChannel(InstantiatingGrpcChannelProvider.java:391)
    at com.google.api.gax.grpc.ChannelPool.<init>(ChannelPool.java:107)
    at com.google.api.gax.grpc.ChannelPool.create(ChannelPool.java:85)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.createChannel(InstantiatingGrpcChannelProvider.java:237)
    at com.google.api.gax.grpc.InstantiatingGrpcChannelProvider.getTransportChannel(InstantiatingGrpcChannelProvider.java:231)
    at com.google.api.gax.rpc.ClientContext.create(ClientContext.java:236)
    at com.google.cloud.pubsub.v1.stub.GrpcPublisherStub.create(GrpcPublisherStub.java:203)
    at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:201)
    at com.google.cloud.pubsub.v1.Publisher.<init>(Publisher.java:91)
    at com.google.cloud.pubsub.v1.Publisher$Builder.build(Publisher.java:881)
    at com.google.cloud.spring.pubsub.support.DefaultPublisherFactory.createPublisher(DefaultPublisherFactory.java:186)
    at com.google.cloud.spring.pubsub.core.publisher.PubSubPublisherTemplate.publish(PubSubPublisherTemplate.java:94)
    at com.google.cloud.spring.pubsub.core.PubSubTemplate.publish(PubSubTemplate.java:120)
    at org.ascension.swe.myaccount.careteambuilder.replay.MessagePublisher.publishMessageToGcpTopic(MessagePublisher.java:33)
    at org.ascension.swe.myaccount.careteambuilder.replay.TestController.lambda$testMessages$1(TestController.java:54)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)
    at reactor.core.publisher.FluxRange$RangeSubscription.slowPath(FluxRange.java:156)
    at reactor.core.publisher.FluxRange$RangeSubscription.request(FluxRange.java:111)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)
    at reactor.core.publisher.FluxFlatMap$FlatMapMain.onSubscribe(FluxFlatMap.java:371)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)
    at reactor.core.publisher.FluxRange.subscribe(FluxRange.java:69)
    at reactor.core.publisher.FluxFromMonoOperator.subscribe(FluxFromMonoOperator.java:83)
    at reactor.core.publisher.FluxDeferContextual.subscribe(FluxDeferContextual.java:57)
    at reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:165)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
    at reactor.core.publisher.MonoZip$ZipCoordinator.signal(MonoZip.java:293)
    at reactor.core.publisher.MonoZip$ZipInner.onNext(MonoZip.java:474)
    at reactor.core.publisher.MonoPeekTerminal$MonoTerminalPeekSubscriber.onNext(MonoPeekTerminal.java:180)
    at reactor.core.publisher.FluxDefaultIfEmpty$DefaultIfEmptySubscriber.onNext(FluxDefaultIfEmpty.java:122)
    at reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onNext(FluxSwitchIfEmpty.java:74)
    at reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:158)
    at reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onNext(FluxContextWrite.java:107)
    at reactor.core.publisher.FluxMapFuseable$MapFuseableConditionalSubscriber.onNext(FluxMapFuseable.java:299)
    at reactor.core.publisher.FluxFilterFuseable$FilterFuseableConditionalSubscriber.onNext(FluxFilterFuseable.java:337)
    at reactor.core.publisher.Operators$BaseFluxToMonoOperator.completePossiblyEmpty(Operators.java:2071)
    at reactor.core.publisher.MonoCollect$CollectSubscriber.onComplete(MonoCollect.java:145)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)
    at reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)
    at reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)
    at reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:431)
    at reactor.netty.http.server.HttpServerOperations.onInboundNext(HttpServerOperations.java:651)
    at reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at reactor.netty.http.server.HttpTrafficHandler.channelRead(HttpTrafficHandler.java:276)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)
    at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)
    at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)
    at io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)
    at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)
    at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)
    at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)
    at io.netty.channel.nio.AbstractNioByteChannel$NioByteUnsafe.read(AbstractNioByteChannel.java:166)
    at io.netty.channel.nio.NioEventLoop.processSelectedKey(NioEventLoop.java:788)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeysOptimized(NioEventLoop.java:724)
    at io.netty.channel.nio.NioEventLoop.processSelectedKeys(NioEventLoop.java:650)
    at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:562)
    at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)
    at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)
    at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)
    at java.base/java.lang.Thread.run(Thread.java:833)
java spring-boot google-cloud-pubsub spring-cloud-gcp
1个回答
0
投票

当不再需要 ManagedChannel 实例时,您必须显式关闭它们。关闭释放资源的渠道。

使用 Spring Cloud GCP 与 Pub/Sub 配合使用时,您可以在不再需要时显式关闭 PubSubTemplate 和关联通道。您可以在 MessagePublisher bean 中定义一个 @PreDestroy 方法来处理关闭,代码如下:

@PreDestroy
public void shutdown() {
    log.info("Shutting down PubSubTemplate and its channels...");
    pubSubTemplate.getPublisherFactory().shutdown();
}

@PreDestroy 方法在 Spring 容器关闭时被调用,可用于执行关闭资源等清理任务。

© www.soinside.com 2019 - 2024. All rights reserved.