RabbitMQ - 当消费者执行时间长于consumer_timeout时,消息不被处理

问题描述 投票:0回答:1

我有一个消费者,很多时候需要很长时间才能完成响应队列的过程。这是因为消费者管理大文件。所以,执行时间比队列的

consumer_timeout
长。这种情况使得队列永远保留消息。我们通过重启RabbitMQ服务解决了这个问题,但我想知道,如何避免这种情况?

这是我的消费者:

public void run() {
        try {
            Channel channel = connection.createChannel();

            channel.basicQos(1);

            Consumer consumer = new DefaultConsumer(channel) {
                @Override
                public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties,
                        byte[] body) throws IOException {
                    AMQP.BasicProperties replyProps = new AMQP.BasicProperties.Builder()
                            .correlationId(properties.getCorrelationId()).build();

                    String response = "";
                    String inputMsgBody = null;
                    try {
                        inputMsgBody = new String(body, "UTF-8");
                        if (QWorkerUtil.checkAndUpdate(envelope, properties)) {
                            response = doJob(inputMsgBody);
                        } else {
                            QWorkerUtil.CallFailureApi(inputMsgBody);
                        }
                        
                    } catch (Throwable t) {
                        QWorkerUtil.CallFailureApi(inputMsgBody);
                        logger.error(
                                "Exception or error occurred :"
                                        + inputMsgBody);
                        logger.error("Exception", t);
                    } finally {
                        channel.basicAck(envelope.getDeliveryTag(), false);
                        channel.basicPublish("", properties.getReplyTo(), replyProps, response.getBytes("UTF-8"));
                    }
                }
            };

            channel.basicConsume(queueName, false, consumer);

        } catch (IOException e) {
            logger.error("Exception", e);
        } 
    }
java rabbitmq
1个回答
0
投票

经过调查,我建议以下选项:

  • 将ack发送到当前队列
  • 当繁重的进程完成时创建一个新的队列/通知,并使用另一个线程检查进程是否失败或成功。
  • 将通知保存在内存对象中,并检查线程是否有进程失败或成功
  • 禁用consumer_timeout
© www.soinside.com 2019 - 2024. All rights reserved.