我正在编写一个使用 amqplib 的 Channel#consume 方法的工作程序。我希望这个工作人员等待作业并在它们出现在队列中时立即处理它们。
我编写了自己的模块来抽象 ampqlib,以下是获取连接、设置队列和消费消息的相关函数:
const getConnection = function(host) {
return amqp.connect(host);
};
const createChannel = function(conn) {
connection = conn;
return conn.createConfirmChannel();
};
const assertQueue = function(channel, queue) {
return channel.assertQueue(queue);
};
const consume = Promise.method(function(channel, queue, processor) {
processor = processor || function(msg) { if (msg) Promise.resolve(msg); };
return channel.consume(queue, processor)
});
const setupQueue = Promise.method(function setupQueue(queue) {
const amqp_host = 'amqp://' + ((host || process.env.AMQP_HOST) || 'localhost');
return getConnection(amqp_host)
.then(conn => createChannel(conn)) // -> returns a `Channel` object
.tap(channel => assertQueue(channel, queue));
});
consumeJob: Promise.method(function consumeJob(queue) {
return setupQueue(queue)
.then(channel => consume(channel, queue))
});
我的问题是 Channel#consume 的奇怪签名。来自 http://www.squaremobius.net/amqp.node/channel_api.html#channel_consume:
#consume(queue, function(msg) {...}, [options, [function(err, ok) {...}]])
回调并不是魔法发生的地方,消息的处理实际上应该在第二个参数中进行,这会破坏承诺的流程。
这就是我计划使用它的方式:
return queueManager.consumeJob(queue)
.then(msg => {
// do some processing
});
但这不起作用。如果队列中没有消息,则承诺将被拒绝,然后如果队列中删除消息,则不会发生任何事情。如果有消息,则仅处理一条消息,然后工作线程会停止,因为它从 Channel#consume 调用中退出了“处理器”函数。
我该怎么办?我想保留queueManager抽象,以便我的代码更容易推理,但我不知道该怎么做......有任何指示吗?
正如 @idbehold 所说,Promise 只能解决一次。如果您想在消息传入时对其进行处理,除了使用此函数之外没有其他方法。 Channel#get只会检查队列一次然后返回;它不适用于您需要工人的场景。
作为一个选项 - 您可以将应用程序呈现为一些消息(或事件)的流。为此有一个名为 Highland 的图书馆。
你的代码应该是这样的(它不是一个完成的示例,但我希望它能说明这个想法):
let messageStream = _((push, next) => {
consume(queue, (msg) => {
push(null, msg)
})
)
// now you can operate with your stream in functional style
message.map((msg) => msg + 'some value').each((msg) => // do something with msg)
这种方法为您提供了许多用于同步和转换的原语。