骆驼Rabbitmq中重启时骆驼路线丢失消息

问题描述 投票:0回答:1

我正在使用camel-rabbitmq。这是我的路线定义

camelContext.addRoutes(new RouteBuilder() {
        @Override
        public void configure() throws Exception {
            from("rabbitmq:TEST?queue=TEST&concurrentConsumers=5")
            .routeId("jms")
            .autoStartup(false)
            .throttle(10)
            .asyncDelayed()
            .log("Consuming message ${body} to ${header.deliveryAddress}")
            .process(new Processor() {

                @Override
                public void process(Exchange exchange) throws Exception {
                        System.out.println(atomicLong.decrementAndGet());
                }
            })

            ;
        }
    }); 

[当我将500条消息推送到此队列时,当停止和开始路由时,通道中的所有消息都将丢失,无论它们去向何方。

如果我使用&autoAck=false配置相同的路由,则它可以正常工作,但会降低性能。为什么骆驼在不使用和不使用autoAck的情况下都无法提供相同的行为。

rabbitmq apache-camel rabbitmq-exchange
1个回答
0
投票

我在骆驼兔mq的rabbitmq消费者中进行以下更改后解决了我的问题

    public void handleCancelOk(String consumerTag) {
        // no work to do
        log.info("Received cancelOk signal on the rabbitMQ channel");
        **downLatch.countDown();**
    }
  @Override
    protected void doStop() throws Exception {
        if (channel == null) {
            return;
        }
        this.requeueChannel=openChannel(consumer.getConnection());
         if (tag != null && isChannelOpen()) {
            channel.basicCancel(tag);
        }
        stopping=true;
         downLatch.await();         

        try {
            lock.acquire();
            if (isChannelOpen()) {
                channel.close();
            }
        } catch (TimeoutException e) {
            log.error("Timeout occured");
            throw e;
        } catch (InterruptedException e1) {
            log.error("Thread Interrupted!");
        } finally {
            lock.release();
        }


    }

通过执行此骆驼路线将使消息消耗并避免消息丢失。

© www.soinside.com 2019 - 2024. All rights reserved.