Node RabbitMQ消耗消息并对每个消息执行某项操作

问题描述 投票:0回答:1

我想使用来自Rabbitmq服务的消息,对于收到的每条消息,我希望对每条消息执行某项操作(例如:将该消息放入数据库中,处理该消息并通过RabbitMq通过另一个队列发送回复)。

当前我的RabbitMq消费者代码如下:

const all = require('bluebird').all;
const basename = require('path').basename;


function receive() {
    const severities = process.argv.slice(2);
    if (severities.length < 1) {
        console.warn('Usage: %s [info] [warning] [error]',
            basename(process.argv[1]));
        process.exit(1);
    }
    let config = {
        protocol: 'amqp',
        hostname: 'localhost',
        port: 5672,
        username: 'rumesh',
        password: 'password',
        locale: 'en_US',
        frameMax: 0,
        heartbeat: 0,
        vhost: '/',
    };
    amqp.connect(config).then(function (conn) {
        process.once('SIGINT', function () {
            conn.close();
        });
        return conn.createChannel().then(function (ch) {
            let queue = 'test';
            let exchange = 'test-exchange';
            let key = 'python-key';
            let exchange_type = 'direct';

            let ok = ch.assertExchange(exchange, exchange_type, {durable: true});

            ok = ok.then(function () {
                return ch.assertQueue(queue, { durable: true});
            });

            ok = ok.then(function (qok) {
                const queue = qok.queue;
                return all(severities.map(function (sev) {
                    ch.bindQueue(queue, exchange, sev,{durable: true});
                })).then(function () {
                    return queue;
                });
            });

            ok = ok.then(function (queue) {
                return ch.consume(queue, logMessage, {noAck: true});
            });
            return ok.then(function () {
                console.log(' [*] Waiting for logs. To exit press CTRL+C.');
            });

            function logMessage(msg) {
                console.log(" [x] %s:'%s'",
                    msg.fields.routingKey,
                    msg.content.toString());
            }
        });
    }).catch(console.warn);
}


module.exports = receive;```
node.js rabbitmq node-amqp node-amqplib
1个回答
0
投票
我建议您创建一个类似onNewMessage的处理函数,每次在队列中收到新消息时都会调用该函数。

您可以通过多种方式对消息进行编码,因为您可以通过AMQP发送二进制数据。

JSON绝对是一种发送消息的方式,在Node.js中处理非常方便。

这里有一些示例代码连接到服务器,然后发送和接收消息:

const amqp = require('amqplib'); const queue = 'test'; // Set your config here... let config = { protocol: 'amqp', hostname: 'localhost', port: 5672, username: 'rumesh', password: 'password', locale: 'en_US', frameMax: 0, heartbeat: 0, vhost: '/', }; async function start() { try { const conn = await createConnection(config); console.log("Connected to AMQP server."); let channel = await conn.createChannel(); await channel.assertQueue(queue, { durable: true}); startPollingForMessages(channel); startSendingMessages(channel); } catch (err) { console.error("start: Connection error:",err.message); } } async function createConnection(config) { const conn = await amqp.connect(config); conn.on("error", function(err) { console.error("Connection error:",err.message); }); conn.on("close", function() { console.error("Connection closed:", err.message); }); return conn; } function startSendingMessages(channel) { const SEND_INTERVAL = 5000; setInterval(() => { sendMessage(channel, queue, JSON.stringify({ timestamp: new Date().toISOString(), message: " Some message" })); }, SEND_INTERVAL); } async function sendMessage(channel, queue, messageContent) { console.log(`sendMessage: sending message: ${messageContent}...`); return channel.sendToQueue(queue, Buffer.from(messageContent)) } function startPollingForMessages(ch) { ch.consume(queue, (msg) => { onNewMessage(msg); ch.ack(msg); }); } function onNewMessage(msg) { // Do your database stuff or whatever here.... console.log("On new message:", msg.content.toString()) } start();

© www.soinside.com 2019 - 2024. All rights reserved.