使用Node.js消费RabbitMQ消息时,我可以等待进程完成吗?

问题描述 投票:0回答:1

我对Node.js和ES6还是很陌生,这让我有些困惑。我试图让进程继续运行,使用RabbitMQ队列中的消息。它需要能够处理该消息(大约需要30-60秒),然后才能抓取下一条消息。目前,我拥有的代码将捕获所有可能的消息,然后尝试分叉进程。当队列中有3-5条消息时,这很好,但是对于20、50或100条消息,这将导致服务器内存不足。

我已经尝试使.consume()回调函数异步并将await添加到消息处理函数中。我尝试将await new Promise内的.consume()回调内包装processMessage。我尝试将await添加到调用channel.consume的行中。什么都不会改变行为。

#!/usr/bin/env node

const amqp = require('amqplib');

const consumeFromQueue = async (queue, isNoAck = false, durable = false, prefetch = null) => {
    const conn_str = "amqp://" + process.env.RABBITMQ_USERNAME + ":" + process.env.RABBITMQ_PASSWORD + "@" + process.env.RABBITMQ_HOST + "/development?heartbeat=60"
    const cluster = await amqp.connect(conn_str);
    const channel = await cluster.createChannel();
    await channel.assertQueue(queue,  { durable: durable, autoDelete: true });
    if (prefetch) {
        channel.prefetch(prefetch);
    }
    console.log(` [x] Waiting for messages in ${queue}. To exit press CTRL+C`)

    try {
        channel.consume(queue, message => {
            if (message !== null) {
                console.log(' [x] Received', message.content.toString());
                processMessage(message.content.toString());
                channel.ack(message);
                return null;
            } else {
                console.log(error, 'Queue is empty!')
                channel.reject(message);
            }
        }, {noAck: isNoAck});
    } catch (error) {
        console.log(error, 'Failed to consume messages from Queue!')
        cluster.close(); 
    }
}

exports.consumeFromQueue = consumeFromQueue;

作为一个旁注,如果我创建一个字符串数组并遍历字符串,当我在processMessage行中添加一个wait时,它将等待执行处理(30-60秒),然后处理下一个字符串。

(async () => {
    for (let i=0; i<urls.length; i++) {
        await processMessage(urls[i]);
    }
})();

因此,我基本上需要具有类似功能的东西,但要监听RabbitMQ中的队列。

javascript node.js rabbitmq es6-promise
1个回答
0
投票

如果要在任何给定时间限制使用者正在处理的消息数,请使用channel.prefetch()

给出的计数是通过可以等待确认的渠道;一旦计数未完成的消息,服务器将不会在此发送更多消息频道,直到一个或多个已被确认。

即,如果您只希望一次只处理一条消息,然后再转到下一条,则设置channel.prefetch(1)

© www.soinside.com 2019 - 2024. All rights reserved.