我正在尝试使用 Node.js 在 Azure 服务总线中实现优先级队列,类似于我使用 RabbitMQ 实现的效果。在 RabbitMQ 中,我可以为消息分配优先级,确保首先处理高优先级消息。但是,我还没有找到使用 Node.js 在 Azure 服务总线中实现相同功能的方法。
我尝试过RabbitMQ并且满足了我的要求。附上RabbiMQ的代码片段
发送者.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
// name of queue
var q = 'hello';
var msg = 'Hello World!';
var priorityValue = 0;
function on_channel_open(err, ch) {
if (err !== null) return bail(err, conn);
// maxPriority : max priority value supported by queue
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
for(var index=1; index<=100; index++) {
priorityValue = Math.floor((Math.random() * 10));
msg = 'Hello World!' + ' ' + index + ' ' + priorityValue;
ch.publish('', q, new Buffer(msg), {priority: priorityValue});
console.log(" [x] Sent '%s'", msg);
}
ch.close(function() { conn.close(); });
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
reciever.js
#!/usr/bin/env node
var amqp = require('amqplib/callback_api');
function bail(err, conn) {
console.error(err);
if (conn) conn.close(function() { process.exit(1); });
}
function on_connect(err, conn) {
if (err !== null) return bail(err);
process.once('SIGINTunction() { conn.close(); });
var q = 'hello';
function on_channel_open(err, ch) {
ch.assertQueue(q, {durable: false, maxPriority: 10}, function(err, ok) {
if (err !== null) return bail(err, conn);
ch.consume(q, function(msg) { // message callback
console.log(" [x] Received '%s'", msg.content.toString());
}, {noAck: true}, function(_consumeOk) { // consume callback
console.log(' [*] Waiting for messages. To exit press CTRL+C');
});
});
}
conn.createChannel(on_channel_open);
}
amqp.connect(on_connect);
期望: 有没有办法使用 Node.js 在 Azure 服务总线的单个队列中实现消息优先级? 如果不是,我可以使用哪些替代方法来实现类似的效果(例如,具有专用消费者的多个队列)?
任何相关的代码片段都会有帮助。 我知道 Azure 服务总线可能本身不支持优先级队列,但任何见解或解决方法将不胜感激。
其他详细信息: Node.js 版本:v20.13.1
以下步骤将帮助您使用 Node.js 将 RabbitMQ 与 Azure 服务总线桥接:
choco install rabbitmq
启用 RabbitMQ Shovel 插件:
您可以使用此命令启用插件及其可视化界面:
rabbitmq-plugins enable rabbitmq_shovel_management
使用服务总线队列的 SAS 策略将 RabbitMQ 连接到 Azure 服务总线。创建 SAS 策略,如下图所示:
复制主连接字符串并使用此参考链接将其转换为 RabbitMQ Shovel AMQP 连接字符串,如下图所示:
现在在浏览器中打开 RabbitMQ 管理插件 http://localhost:1567 并转到
Admin -> Shovel Management
,您可以在其中添加新的铲子,它将负责将消息从 RabbitMQ 队列发送到您的 Azure 服务 Bus排队.
发送消息之前请确保 Shovel Status 正在运行。发送和接收消息的代码取自此链接。
发送消息:
import amqp from "amqplib";
const queue = "azure";
const text = {
item_id: "macbook",
text: "This is a sample message to send to the receiver to check the ordered item's availability.",
};
(async () => {
let connection;
try {
connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
// Ensure the queue is declared with the correct durability setting
await channel.assertQueue(queue, { durable: true });
channel.sendToQueue(queue, Buffer.from(JSON.stringify(text)));
console.log(" [x] Sent '%s'", text);
await channel.close();
} catch (err) {
console.warn(err);
} finally {
if (connection) await connection.close();
}
})();
接收消息:
import amqp from "amqplib";
const queue = "azure";
(async () => {
try {
const connection = await amqp.connect("amqp://localhost");
const channel = await connection.createChannel();
process.once("SIGINT", async () => {
await channel.close();
await connection.close();
});
await channel.assertQueue(queue, { durable: true }); // Change durable to true
await channel.consume(
queue,
(message) => {
if (message) {
console.log(
" [x] Received '%s'",
JSON.parse(message.content.toString())
);
}
},
{ noAck: true }
);
console.log(" [*] Waiting for messages. To exit press CTRL+C");
} catch (err) {
console.warn(err);
}
})();