我已经在nodejs中为rabbitmq创建了一个侦听器,我想创建一个弹性连接,以便在断开连接时重新连接。
这是我当前的代码
const amqp = require('amqplib');
const rabbitmqServerUrl = 'amqp://localhost';
const queueName = 'best_queue';
let connection = null;
let channel = null;
async function setupConnection() {
try {
connection = await amqp.connect(rabbitmqServerUrl);
connection.on('error', (err) => {
if (err.message.includes('Connection closed')) {
console.error('Connection closed, reconnecting...');
setTimeout(setupConnection, 5000); // Retry connection after a delay
} else {
console.error('Connection error:', err.message);
}
});
channel = await connection.createChannel();
// Create a durable queue
await channel.assertQueue(queueName, { durable: true });
console.log('Connected to RabbitMQ');
// Start the consumer
startConsumer();
} catch (error) {
console.error('Error connecting to RabbitMQ:', error.message);
// Retry connection after a delay
setTimeout(setupConnection, 5000);
}
}
function startConsumer() {
if (!channel) {
console.error('Channel is not available, skipping consumer start.');
return;
}
channel.consume(queueName, async (msg) => {
if (msg !== null) {
try {
// Process the message
console.log('Received message:', msg.content.toString());
// Simulate a processing delay (replace this with your actual processing logic)
await new Promise((resolve) => setTimeout(resolve, 1000));
// Acknowledge the message
channel.ack(msg);
} catch (err) {
console.error('Error processing message:', err.message);
// Handle message processing errors as needed
}
}
});
channel.on('close', () => {
console.error('Channel closed, reconnecting...');
setTimeout(startConsumer, 5000); // Restart the consumer after a delay
});
channel.on('error', (error) => {
console.error('Channel error:', error.message);
// Handle channel errors as needed
});
console.log('Consumer started');
}
setupConnection();
当我在rabbitmq接口中强制断开它时,它不会重新连接。 我不确定这在本地主机上是否不可能,或者我的代码中缺少某些内容。 感谢您的帮助
我刚刚尝试针对我自己的本地 RabbitMQ 运行您的代码。 最新的 RabbitMQ (3.12.6) 和 Node LTS (18.18.0)。
通过
IllegalOperationError
关闭 Node 应用程序的连接或通过 rabbitmqctl close_connection <{Node Process pid}> ""
停止 RabbitMQ 进程后尝试重新连接时,似乎失败并显示 rabbitmqctl stop
。
我发现用
setTimeout(startConsumer, 5000);
替换您的行 setTimeout(setupConnection, 5000)
解决了问题,并在断开连接后始终重新建立与 RabbitMQ 服务器的连接。
我想在您的用例中,在重新定义消费者逻辑之前重新建立 AMQP 连接和通道非常重要。让我知道你怎么样!