我正在创建一个简单的聊天应用程序
{
//Creating a new connection
amqp.connect('amqp://localhost', (err, conn) => {
conn.createChannel(function(err, ch) {
ch.assertExchange(exchangeName, 'direct', { durable: false });
const routingKey = // some routing key;
wss.on('connection', (ws, req) => {
// Creating a new queue for the user
ch.assertQueue('', { exclusive: true, persist: true, durable: true }, (err, q) => {
// Binds with the routing key
ch.bindQueue(q.queue, exchangeName, routingKey);
ch.consume(q.queue, (msg) => {
if (ws.readyState === 1) {
ch.ack(msg);
ws.send(` [-] Received ${msg.content.toString()}`);
} else {
// What will come here so that it will be requeued to a new queue with the same routing id?
}
}, { noAck: false });
});
ws.on('message', (message) => {
ch.publish(exchangeName, routingKey, new Buffer(message));
console.log(`[x] Sent ${message} to the exchange with routingKey ${routingKey}`);
});
})
当所有用户都在与服务器连接的套接字时,这可以正常工作。我想要实现的是当套接字连接为用户而死时,如果他错过了基于路由密钥(特定于用户)的消息,那么每当他重新连接到不同的队列时,他应该能够再次接收这些消息。我觉得这是可能的,因为如果在任何队列中都没有确认消息,那么消息可以保持交换。但不确定如何实现它。
让我解决几个误解......
我想要实现的是当套接字连接为用户而死时,如果他错过了基于路由密钥(特定于用户)的消息,那么每当他重新连接到不同的队列时,他应该能够再次接收这些消息
您正在使用独占队列。根据定义,当关联的连接和通道消失时,这些队列将被删除。删除队列时,这些队列中的消息将丢失。
我觉得这是可能的,因为如果在任何队列中都没有确认消息,那么消息可以保持交换。
这不是RabbitMQ的工作方式。交换用于路由消息,队列用于存储消息。
以下评论不正确:
如果user1和user2在不同的主机上,显然他们应该建立不同的通道。 'channel'就像连接一样,如果两个通道消耗相同的队列,它们可以接收相同的消息。
在不同的主机上,可能在不同的进程中运行,user1
和user2
将各自建立自己的连接和渠道。两个消费者永远不会从同一队列收到相同的消息。
回到原始问题,您需要将队列声明为非独占且持久,并将其命名为当应用程序重新连接时,应用程序连接到同一队列。这样,应用程序可以使用队列中保留的消息。
RabbitMQ可以通过多种方式为您删除未使用的documented队列。
注意:RabbitMQ团队监控rabbitmq-users
mailing list,有时只回答StackOverflow上的问题。
消费者应该使用确认来消费消息,如果消息被传递给消费者并且在消费者连接丢失之前未被确认,则RabbitMQ将重新发送消息。你可以从RabbitMQ doc获得更多细节。
在您的情况下,当套接字连接断开时,您应该强制关闭通道,然后RabbitMQ将知道这些消息应该重新发送。
要确保在服务器重新启动时消息能够存活,请设置exchange'duper':ch.assertExchange(exchangeName, 'direct', { durable: false });