RabbitMQ 通道因未知传递标签的通道错误而关闭

问题描述 投票:0回答:1

我从rabbitMQ收到每条消息后都会看到以下错误,

通道关闭:通道错误;协议方法:#method(reply-code=406,reply-text=PRECONDITION_FAILED - 未知传递标签 1,class-id=60,method-id=80) o.s.a.r.l.SimpleMessageListenerContainer - :重新启动 Consumer@3d5a462b:标签=[{amq.ctag-zGA1v36aEmXxnG7bqBUcdg=asstMgmt.queue}],通道=缓存的 Rabbit 通道:AMQChannel(amqp://mqadmin@xx:xx:xx:xx:5672/assetmgmt ,1165), conn: Proxy@5c87dd9e 共享 Rabbit 连接: SimpleConnection@2ec6c89 [delegate=amqp://mqadmin@@xx:xx:xx:xx:5672/assetmgmt, localPort= 端口], recognizeMode=AUTO 本地队列大小= 0

消息不会丢失,消耗和处理都很好,但是,如果上面观察到 amqp://mqadmin@@xx:xx:xx:xx:5672/assetmgmt,1165.. 创建了 1165 个通道.,并且它没有不要就此止步,每条消息被消费时,它都会持续发生。

以下是我的配置:

@Bean
@Qualifier("amConnectionFactory")
public ConnectionFactory connectionFactory() throws NoSuchAlgorithmException, KeyManagementException {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(env.getProperty("assetMgmt.rabbitmq.host"));
    connectionFactory.setPort(Integer.valueOf(env.getProperty("assetMgmt.port")));
    connectionFactory.setUsername(env.getProperty("assetMgmt.username"));
    connectionFactory.setPassword(env.getProperty("assetMgmt.password"));
    connectionFactory.setVirtualHost(env.getProperty("assetMgmt.virtual.host"));
    connectionFactory.setChannelCacheSize(5);
    //connectionFactory.setChannelCheckoutTimeout(100000);
    return connectionFactory;
}

@Bean
@Qualifier("umConnectionFactory")
public ConnectionFactory umConnectionFactory() throws NoSuchAlgorithmException, KeyManagementException {
    CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
    connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
    connectionFactory.setPort(Integer.valueOf(env.getProperty("spring.rabbitmq.port")));
    connectionFactory.setVirtualHost(env.getProperty("spring.rabbitmq.virtual-host"));
    connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
    connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
    connectionFactory.setChannelCacheSize(5);
    //connectionFactory.setChannelCheckoutTimeout(100000);
    return connectionFactory;
}

@Bean
public SimpleRabbitListenerContainerFactory assetContainerFactory(
        @Qualifier("connectionFactory") ConnectionFactory connectionFactory
) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(connectionFactory);
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(6);
    factory.setDefaultRequeueRejected(false);
    RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
    rabbitTemplate.setChannelTransacted(true);
    rabbitTemplate.setReplyTimeout(60000);
    return factory;
}

@Bean
public SimpleRabbitListenerContainerFactory umContainerFactory(
        @Qualifier("umConnectionFactory") ConnectionFactory umConnectionFactory) {
    SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
    factory.setConnectionFactory(umConnectionFactory);
    factory.setConcurrentConsumers(3);
    factory.setMaxConcurrentConsumers(6);
    factory.setDefaultRequeueRejected(false);
    RabbitTemplate rabbitTemplate = new RabbitTemplate(umConnectionFactory);
    rabbitTemplate.setChannelTransacted(true);
    rabbitTemplate.setReplyTimeout(60000);
    return factory;
}

我需要 2 个实例,因为我们需要为不同的实例连接 2 个虚拟主机。 下面是我们收到消息后发生的情况的代码

     @RabbitListener(admin = "amqpAssetMgmtAdmin" ,queues = "${asstMgmt.queue.name}",
              containerFactory = "assetContainerFactory")
            public void loadAssetMgmtData(Message msg,Channel channel) throws IOException {
            long tag =0;
        try {

        UDDAO dao = new UDDAO();
        dao.setAssetMgmtRabbitTemplate(assetMgmtRabbitTemplate);
        Map<String, Object> headers = msg.getMessageProperties().getHeaders();
        String msgType = (String) headers.get("type");
        String payload = new String(msg.getBody());
        tag = msg.getMessageProperties().getDeliveryTag();
        if(msgType == null || msgType.isEmpty()){
            log.error("Got message with empty message type, rejecting the message");
            channel.basicReject(tag, false);
        }
              //Process the message
        channel.basicAck(tag, false);

    }  catch (JDBCConnectionException e) {
        log.error("DB exception so requeueing the message");
        log.error(e.getMessage());
        if(tag>0)
            channel.basicReject(tag, true);

    }catch (Exception e) {
        if(msg.getMessageProperties().getRedelivered()){
            log.error("Internal server error occurred. rejecting without requeueing ", e);
            if(tag>0)
            channel.basicReject(tag, false);
        }else{
            if(tag>0)
            channel.basicReject(tag, true);
            log.error("Internal server error occurred. rejecting and requeuing ", e);
        }
    }
}

感谢您的宝贵时间。请帮助我理解我哪里错了。

java spring-boot rabbitmq
1个回答
0
投票

仔细观察日志,发现acknowledgeMode=AUTO,每当我的代码确认消费消息channel.basicAck(tag, false)时,它就会抛出警告,通道正在停止,并且正在创建一个新通道。由于我的代码需要手动确认,我设置了factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);解决了问题。

© www.soinside.com 2019 - 2024. All rights reserved.