如何在nodejs中为rabbitmq添加重连逻辑

问题描述 投票:0回答:1

我已经在nodejs中为rabbitmq创建了一个侦听器,我想创建一个弹性连接,以便在断开连接时重新连接。

这是我当前的代码

const amqp = require('amqplib');

const rabbitmqServerUrl = 'amqp://localhost';
const queueName = 'best_queue';

let connection = null;
let channel = null;

async function setupConnection() {
  try {
    connection = await amqp.connect(rabbitmqServerUrl);
    connection.on('error', (err) => {
      if (err.message.includes('Connection closed')) {
        console.error('Connection closed, reconnecting...');
        setTimeout(setupConnection, 5000); // Retry connection after a delay
      } else {
        console.error('Connection error:', err.message);
      }
    });

    channel = await connection.createChannel();

    // Create a durable queue
    await channel.assertQueue(queueName, { durable: true });

    console.log('Connected to RabbitMQ');

    // Start the consumer
    startConsumer();
  } catch (error) {
    console.error('Error connecting to RabbitMQ:', error.message);

    // Retry connection after a delay
    setTimeout(setupConnection, 5000);
  }
}

function startConsumer() {
  if (!channel) {
    console.error('Channel is not available, skipping consumer start.');
    return;
  }

  channel.consume(queueName, async (msg) => {
    if (msg !== null) {
      try {
        // Process the message
        console.log('Received message:', msg.content.toString());

        // Simulate a processing delay (replace this with your actual processing logic)
        await new Promise((resolve) => setTimeout(resolve, 1000));

        // Acknowledge the message
        channel.ack(msg);
      } catch (err) {
        console.error('Error processing message:', err.message);
        // Handle message processing errors as needed
      }
    }
  });

  channel.on('close', () => {
    console.error('Channel closed, reconnecting...');
    setTimeout(startConsumer, 5000); // Restart the consumer after a delay
  });

  channel.on('error', (error) => {
    console.error('Channel error:', error.message);
    // Handle channel errors as needed
  });

  console.log('Consumer started');
}

setupConnection();

当我在rabbitmq接口中强制断开它时,它不会重新连接。 我不确定这在本地主机上是否不可能,或者我的代码中缺少某些内容。 感谢您的帮助

node.js rabbitmq
1个回答
0
投票

我刚刚尝试针对我自己的本地 RabbitMQ 运行您的代码。 最新的 RabbitMQ (3.12.6) 和 Node LTS (18.18.0)。

通过

IllegalOperationError
关闭 Node 应用程序的连接或通过
rabbitmqctl close_connection <{Node Process pid}> ""
停止 RabbitMQ 进程后尝试重新连接时,似乎失败并显示
rabbitmqctl stop

我发现用

setTimeout(startConsumer, 5000);
替换您的行
setTimeout(setupConnection, 5000)
解决了问题,并在断开连接后始终重新建立与 RabbitMQ 服务器的连接。

我想在您的用例中,在重新定义消费者逻辑之前重新建立 AMQP 连接和通道非常重要。让我知道你怎么样!

© www.soinside.com 2019 - 2024. All rights reserved.