Azure 服务总线:在 Node.js 中实现优先级队列(类似于 RabbitMQ)

问题描述 投票:0回答:1

我正在尝试使用 Node.js 在 Azure 服务总线中实现优先级队列,类似于我使用 RabbitMQ 实现的效果。在 RabbitMQ 中,我可以为消息分配优先级,确保首先处理高优先级消息。但是,我还没有找到使用 Node.js 在 Azure 服务总线中实现相同功能的方法。

我尝试过RabbitMQ并且满足了我的要求。附上RabbiMQ的代码片段

发送者.js

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
  console.error(err);
  if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
  if (err !== null) return bail(err);
 
  // name of queue
  var q = 'hello';
  var msg = 'Hello World!';
  var priorityValue = 0;

  function on_channel_open(err, ch) {
    if (err !== null) return bail(err, conn);
    // maxPriority : max priority value supported by queue
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
      if (err !== null) return bail(err, conn);

      for(var index=1; index<=100; index++) {
             priorityValue = Math.floor((Math.random() * 10));
             msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
             ch.publish('', q, new Buffer(msg), {priority: priorityValue});
             console.log(" [x] Sent '%s'", msg);
      }

      ch.close(function() { conn.close(); });
    });
  }

  conn.createChannel(on_channel_open);
}

amqp.connect(on_connect);

reciever.js

#!/usr/bin/env node

var amqp = require('amqplib/callback_api');

function bail(err, conn) {
  console.error(err);
  if (conn) conn.close(function() { process.exit(1); });
}

function on_connect(err, conn) {
  if (err !== null) return bail(err);
  process.once('SIGINTunction() { conn.close(); });
  
  var q = 'hello';

  function on_channel_open(err, ch) {
    ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
      if (err !== null) return bail(err, conn);
      ch.consume(q, function(msg) { // message callback
        console.log(" [x] Received '%s'", msg.content.toString());
      }, {noAck: true}, function(_consumeOk) { // consume callback
        console.log(' [*] Waiting for messages. To exit press CTRL+C');
      });
    });
  }

  conn.createChannel(on_channel_open);
}

amqp.connect(on_connect);

期望有没有办法使用 Node.js 在 Azure 服务总线的单个队列中实现消息优先级? 如果不是,我可以使用哪些替代方法来实现类似的效果(例如,具有专用消费者的多个队列)?

任何相关的代码片段都会有帮助。 我知道 Azure 服务总线可能本身不支持优先级队列,但任何见解或解决方法将不胜感激。

其他详细信息: Node.js 版本:v20.13.1

node.js rabbitmq azureservicebus priority-queue
1个回答
0
投票

以下步骤将帮助您使用 Node.js 将 RabbitMQ 与 Azure 服务总线桥接:

  • 要安装 RabbitMQ,请从命令行或 PowerShell 运行以下命令:
choco install rabbitmq

启用 RabbitMQ Shovel 插件:

您可以使用此命令启用插件及其可视化界面:

rabbitmq-plugins enable rabbitmq_shovel_management

RabbitMQ Shovel Management

使用服务总线队列的 SAS 策略将 RabbitMQ 连接到 Azure 服务总线。创建 SAS 策略,如下图所示:

SAS Policy

复制主连接字符串并使用此参考链接将其转换为 RabbitMQ Shovel AMQP 连接字符串,如下图所示:

AMQP Connection String

现在在浏览器中打开 RabbitMQ 管理插件 http://localhost:1567 并转到

Admin -> Shovel Management
,您可以在其中添加新的铲子,它将负责将消息从 RabbitMQ 队列发送到您的 Azure 服务 Bus排队.

Shovel Management

发送消息之前请确保 Shovel Status 正在运行。发送和接收消息的代码取自此链接

发送消息:

import amqp from "amqplib";

const queue = "azure";
const text = {
  item_id: "macbook",
  text: "This is a sample message to send to the receiver to check the ordered item's availability.",
};

(async () => {
  let connection;
  try {
    connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();

    // Ensure the queue is declared with the correct durability setting
    await channel.assertQueue(queue, { durable: true });
    channel.sendToQueue(queue, Buffer.from(JSON.stringify(text)));
    console.log(" [x] Sent '%s'", text);
    await channel.close();
  } catch (err) {
    console.warn(err);
  } finally {
    if (connection) await connection.close();
  }
})();

Sending Messages

Receiving Messages

接收消息:

Message Content

import amqp from "amqplib";

const queue = "azure";

(async () => {
  try {
    const connection = await amqp.connect("amqp://localhost");
    const channel = await connection.createChannel();

    process.once("SIGINT", async () => {
      await channel.close();
      await connection.close();
    });

    await channel.assertQueue(queue, { durable: true }); // Change durable to true
    await channel.consume(
      queue,
      (message) => {
        if (message) {
          console.log(
            " [x] Received '%s'",
            JSON.parse(message.content.toString())
          );
        }
      },
      { noAck: true }
    );

    console.log(" [*] Waiting for messages. To exit press CTRL+C");
  } catch (err) {
    console.warn(err);
  }
})();

Receiving Messages

© www.soinside.com 2019 - 2024. All rights reserved.