我从rabbitMQ收到每条消息后都会看到以下错误,
通道关闭:通道错误;协议方法:#method
(reply-code=406,reply-text=PRECONDITION_FAILED - 未知传递标签 1,class-id=60,method-id=80) o.s.a.r.l.SimpleMessageListenerContainer - :重新启动 Consumer@3d5a462b:标签=[{amq.ctag-zGA1v36aEmXxnG7bqBUcdg=asstMgmt.queue}],通道=缓存的 Rabbit 通道:AMQChannel(amqp://mqadmin@xx:xx:xx:xx:5672/assetmgmt ,1165), conn: Proxy@5c87dd9e 共享 Rabbit 连接: SimpleConnection@2ec6c89 [delegate=amqp://mqadmin@@xx:xx:xx:xx:5672/assetmgmt, localPort= 端口], recognizeMode=AUTO 本地队列大小= 0
消息不会丢失,消耗和处理都很好,但是,如果上面观察到 amqp://mqadmin@@xx:xx:xx:xx:5672/assetmgmt,1165.. 创建了 1165 个通道.,并且它没有不要就此止步,每条消息被消费时,它都会持续发生。
以下是我的配置:
@Bean
@Qualifier("amConnectionFactory")
public ConnectionFactory connectionFactory() throws NoSuchAlgorithmException, KeyManagementException {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(env.getProperty("assetMgmt.rabbitmq.host"));
connectionFactory.setPort(Integer.valueOf(env.getProperty("assetMgmt.port")));
connectionFactory.setUsername(env.getProperty("assetMgmt.username"));
connectionFactory.setPassword(env.getProperty("assetMgmt.password"));
connectionFactory.setVirtualHost(env.getProperty("assetMgmt.virtual.host"));
connectionFactory.setChannelCacheSize(5);
//connectionFactory.setChannelCheckoutTimeout(100000);
return connectionFactory;
}
@Bean
@Qualifier("umConnectionFactory")
public ConnectionFactory umConnectionFactory() throws NoSuchAlgorithmException, KeyManagementException {
CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
connectionFactory.setHost(env.getProperty("spring.rabbitmq.host"));
connectionFactory.setPort(Integer.valueOf(env.getProperty("spring.rabbitmq.port")));
connectionFactory.setVirtualHost(env.getProperty("spring.rabbitmq.virtual-host"));
connectionFactory.setUsername(env.getProperty("spring.rabbitmq.username"));
connectionFactory.setPassword(env.getProperty("spring.rabbitmq.password"));
connectionFactory.setChannelCacheSize(5);
//connectionFactory.setChannelCheckoutTimeout(100000);
return connectionFactory;
}
@Bean
public SimpleRabbitListenerContainerFactory assetContainerFactory(
@Qualifier("connectionFactory") ConnectionFactory connectionFactory
) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(6);
factory.setDefaultRequeueRejected(false);
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setReplyTimeout(60000);
return factory;
}
@Bean
public SimpleRabbitListenerContainerFactory umContainerFactory(
@Qualifier("umConnectionFactory") ConnectionFactory umConnectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(umConnectionFactory);
factory.setConcurrentConsumers(3);
factory.setMaxConcurrentConsumers(6);
factory.setDefaultRequeueRejected(false);
RabbitTemplate rabbitTemplate = new RabbitTemplate(umConnectionFactory);
rabbitTemplate.setChannelTransacted(true);
rabbitTemplate.setReplyTimeout(60000);
return factory;
}
我需要 2 个实例,因为我们需要为不同的实例连接 2 个虚拟主机。 下面是我们收到消息后发生的情况的代码
@RabbitListener(admin = "amqpAssetMgmtAdmin" ,queues = "${asstMgmt.queue.name}",
containerFactory = "assetContainerFactory")
public void loadAssetMgmtData(Message msg,Channel channel) throws IOException {
long tag =0;
try {
UDDAO dao = new UDDAO();
dao.setAssetMgmtRabbitTemplate(assetMgmtRabbitTemplate);
Map<String, Object> headers = msg.getMessageProperties().getHeaders();
String msgType = (String) headers.get("type");
String payload = new String(msg.getBody());
tag = msg.getMessageProperties().getDeliveryTag();
if(msgType == null || msgType.isEmpty()){
log.error("Got message with empty message type, rejecting the message");
channel.basicReject(tag, false);
}
//Process the message
channel.basicAck(tag, false);
} catch (JDBCConnectionException e) {
log.error("DB exception so requeueing the message");
log.error(e.getMessage());
if(tag>0)
channel.basicReject(tag, true);
}catch (Exception e) {
if(msg.getMessageProperties().getRedelivered()){
log.error("Internal server error occurred. rejecting without requeueing ", e);
if(tag>0)
channel.basicReject(tag, false);
}else{
if(tag>0)
channel.basicReject(tag, true);
log.error("Internal server error occurred. rejecting and requeuing ", e);
}
}
}
感谢您的宝贵时间。请帮助我理解我哪里错了。
仔细观察日志,发现acknowledgeMode=AUTO,每当我的代码确认消费消息channel.basicAck(tag, false)时,它就会抛出警告,通道正在停止,并且正在创建一个新通道。由于我的代码需要手动确认,我设置了factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);解决了问题。