我正在使用camel-rabbitmq。这是我的路线定义
camelContext.addRoutes(new RouteBuilder() {
@Override
public void configure() throws Exception {
from("rabbitmq:TEST?queue=TEST&concurrentConsumers=5")
.routeId("jms")
.autoStartup(false)
.throttle(10)
.asyncDelayed()
.log("Consuming message ${body} to ${header.deliveryAddress}")
.process(new Processor() {
@Override
public void process(Exchange exchange) throws Exception {
System.out.println(atomicLong.decrementAndGet());
}
})
;
}
});
[当我将500条消息推送到此队列时,当停止和开始路由时,通道中的所有消息都将丢失,无论它们去向何方。
如果我使用&autoAck=false
配置相同的路由,则它可以正常工作,但会降低性能。为什么骆驼在不使用和不使用autoAck的情况下都无法提供相同的行为。
我在骆驼兔mq的rabbitmq消费者中进行以下更改后解决了我的问题
public void handleCancelOk(String consumerTag) {
// no work to do
log.info("Received cancelOk signal on the rabbitMQ channel");
**downLatch.countDown();**
}
@Override
protected void doStop() throws Exception {
if (channel == null) {
return;
}
this.requeueChannel=openChannel(consumer.getConnection());
if (tag != null && isChannelOpen()) {
channel.basicCancel(tag);
}
stopping=true;
downLatch.await();
try {
lock.acquire();
if (isChannelOpen()) {
channel.close();
}
} catch (TimeoutException e) {
log.error("Timeout occured");
throw e;
} catch (InterruptedException e1) {
log.error("Thread Interrupted!");
} finally {
lock.release();
}
}
通过执行此骆驼路线将使消息消耗并避免消息丢失。