我有一个消费者,很多时候需要很长时间才能完成响应队列的过程。这是因为消费者管理大文件。所以,执行时间比队列的
consumer_timeout
长。这种情况使得队列永远保留消息。我们通过重启RabbitMQ服务解决了这个问题,但我想知道,如何避免这种情况?
这是我的消费者:
public void run() {
try {
Channel channel = connection.createChannel();
channel.basicQos(1);
Consumer consumer = new DefaultConsumer(channel) {
@Override
public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
byte[] body) throws IOException {
AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
.correlationId(properties.getCorrelationId()).build();
String response = "";
String inputMsgBody = null;
try {
inputMsgBody = new String(body, "UTF-8");
if (QWorkerUtil.checkAndUpdate(envelope, properties)) {
response = doJob(inputMsgBody);
} else {
QWorkerUtil.CallFailureApi(inputMsgBody);
}
} catch (Throwable t) {
QWorkerUtil.CallFailureApi(inputMsgBody);
logger.error(
"Exception or error occurred :"
+ inputMsgBody);
logger.error("Exception", t);
} finally {
channel.basicAck(envelope.getDeliveryTag(), false);
channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
}
}
};
channel.basicConsume(queueName, false, consumer);
} catch (IOException e) {
logger.error("Exception", e);
}
}
经过调查,我建议以下选项: