所以我有两个微服务通过 RabbitMQ 进行通信,其中一个向另一个微服务发送 RPC 请求(使用
RabbitTemplate#sendAndReceive
)。我注意到,如果经纪商停机几分钟,应用程序将无法恢复。
以下是我配置连接工厂和模板的方法:
@Bean
public AbstractConnectionFactory connectionFactory() {
return new CachingConnectionFactory(createRabbitConnectionFactory(properties));
}
private static com.rabbitmq.client.ConnectionFactory createRabbitConnectionFactory(RabbitProperties properties) {
final com.rabbitmq.client.ConnectionFactory connectionFactory = new com.rabbitmq.client.ConnectionFactory();
connectionFactory.setHandshakeTimeout(60000);
connectionFactory.setHost(properties.getHost());
connectionFactory.setPort(properties.getPort());
connectionFactory.setUsername(properties.getUsername());
connectionFactory.setPassword(properties.getPassword());
connectionFactory.setVirtualHost(properties.getVirtualhost());
return connectionFactory;
}
@Bean
public RabbitTemplate rabbitTemplate(
AbstractConnectionFactory connectionFactory
) {
final RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);
rabbitTemplate.setReplyTimeout(10000);
rabbitTemplate.setUserCorrelationId(true);
return rabbitTemplate;
}
这是我如何使用 RabbitTemplate:
final Message responseMessage = rabbitTemplate.sendAndReceive(
routingKey,
new Message(serializationService.toBytes(request),
MessagePropertiesBuilder
.newInstance()
.setContentType("my-content")
.setDeliveryMode(MessageDeliveryMode.NON_PERSISTENT)
.setCorrelationId(correlationData.getId())
.build()
),
correlationData
);
然后我要做的就是加载触发此 RPC 的 API 方法,然后关闭代理并等待几分钟。代理启动后,我看到很多这样的日志:
SimpleConsumer [queue=amq.rabbitmq.reply-to, index=1091, consumerTag=amq.ctag-1DtEyYlNIMSNowMMsoPYNQ identity=57919ab] started
然后所有新请求都会导致错误:
org.springframework.amqp.AmqpResourceNotAvailableException: The channelMax limit is reached. Try later.
在 RabbitMQ 界面中,我看到我的连接已创建 2047 个通道,所有通道都处于空闲状态。只有重新启动应用程序才能解决问题。
您认为这里发生了什么?我怎样才能让客户端自动恢复?
我使用 2.7.5 版本的
spring-boot-starter-amqp
(所以 spring-messaging
5.3.2 和 spring-rabbit
2.4.7)
我尝试过配置
channelCheckoutTimeout
:
cachingConnectionFactory.setChannelCheckoutTimeout(10000);
cachingConnectionFactory.setChannelCacheSize(200);
但是我又遇到了另一个错误和同样的情况:
org.springframework.amqp.AmqpTimeoutException: No available channels
找到问题所在了。 您的应用程序属性必须具有以下内容:
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
cache:
channel:
checkout-timeout: 30s
注意这一点
checkout-timeout: 30s
。
这配置了从 CachingConnectionFactory
打开的通道的限制:
/**
* Sets the channel checkout timeout. When greater than 0, enables channel limiting
* in that the {@link #channelCacheSize} becomes the total number of available channels per
* connection rather than a simple cache size. Note that changing the {@link #channelCacheSize}
* does not affect the limit on existing connection(s), invoke {@link #destroy()} to cause a
* new connection to be created with the new limit.
* <p>
* Since 1.5.5, also applies to getting a connection when the cache mode is CONNECTION.
* @param channelCheckoutTimeout the timeout in milliseconds; default 0 (channel limiting not enabled).
* @since 1.4.2
* @see #setConnectionLimit(int)
*/
public void setChannelCheckoutTimeout(long channelCheckoutTimeout) {
文档中的更多信息:https://docs.spring.io/spring-amqp/reference/amqp/connections.html#cachingconnectionfactory
从版本 1.4.2 开始,
有一个名为CachingConnectionFactory
的属性。当此属性大于零时,channelCheckoutTimeout
成为可在连接上创建的通道数量的限制。如果达到限制,调用线程将阻塞,直到通道可用或达到超时,在这种情况下会抛出channelCacheSize
。AmqpTimeoutException
您还可以调整缓存以符合您的期望:
spring:
rabbitmq:
cache:
channel:
checkout-timeout: 30s
size: 100
注意:
host/port
和凭证默认情况下是这样的。所以,不需要在 application.yml
中使用它们。