RabbitMQ,在节点中编写的pub子模型中重新排除被拒绝的消息

问题描述 投票:0回答:2

我正在创建一个简单的聊天应用程序

  1. 可以有多个用户和多个聊天室。
  2. 每个聊天室可以容纳多个用户。
  3. 服务器启动时建立rabbitMQ连接。
  4. 当用户连接到服务器时,将在rabbitMQ通道中打开套接字连接。
  5. 在通过套接字从客户端(用户)发送消息时,rabbitmq通道使用特定的路由密钥将其推送到交换机。
  6. 在消费时,如果套接字处于活动状态,则通过它发送并确认它。如果不是不承认并重新排队。

{

//Creating a new connection
amqp.connect('amqp://localhost', (err, conn) => {

conn.createChannel(function(err, ch) {

ch.assertExchange(exchangeName, 'direct', { durable: false });
const routingKey = // some routing key;    

wss.on('connection', (ws, req) => {
  // Creating a new queue for the user
  ch.assertQueue('', { exclusive: true, persist: true, durable: true }, (err, q) => {
    // Binds with the routing key
    ch.bindQueue(q.queue, exchangeName, routingKey);

    ch.consume(q.queue, (msg) => {
      if (ws.readyState === 1) {
        ch.ack(msg);
        ws.send(` [-] Received ${msg.content.toString()}`);
      } else {
        // What will come here so that it will be requeued to a new queue with the same routing id?
      }
    }, { noAck: false });
  });

  ws.on('message', (message) => {
    ch.publish(exchangeName, routingKey, new Buffer(message));
    console.log(`[x] Sent ${message} to the exchange with routingKey ${routingKey}`);
  });
}) 

当所有用户都在与服务器连接的套接字时,这可以正常工作。我想要实现的是当套接字连接为用户而死时,如果他错过了基于路由密钥(特定于用户)的消息,那么每当他重新连接到不同的队列时,他应该能够再次接收这些消息。我觉得这是可能的,因为如果在任何队列中都没有确认消息,那么消息可以保持交换。但不确定如何实现它。

node.js rabbitmq message-queue
2个回答
1
投票

让我解决几个误解......

我想要实现的是当套接字连接为用户而死时,如果他错过了基于路由密钥(特定于用户)的消息,那么每当他重新连接到不同的队列时,他应该能够再次接收这些消息

您正在使用独占队列。根据定义,当关联的连接和通道消失时,这些队列将被删除。删除队列时,这些队列中的消息将丢失。

我觉得这是可能的,因为如果在任何队列中都没有确认消息,那么消息可以保持交换。

这不是RabbitMQ的工作方式。交换用于路由消息,队列用于存储消息。

以下评论不正确:

如果user1和user2在不同的主机上,显然他们应该建立不同的通道。 'channel'就像连接一样,如果两个通道消耗相同的队列,它们可以接收相同的消息。

在不同的主机上,可能在不同的进程中运行,user1user2将各自建立自己的连接和渠道。两个消费者永远不会从同一队列收到相同的消息。

回到原始问题,您需要将队列声明为非独占且持久,并将其命名为当应用程序重新连接时,应用程序连接到同一队列。这样,应用程序可以使用队列中保留的消息。

RabbitMQ可以通过多种方式为您删除未使用的documented队列。


注意:RabbitMQ团队监控rabbitmq-users mailing list,有时只回答StackOverflow上的问题。


0
投票

消费者应该使用确认来消费消息,如果消息被传递给消费者并且在消费者连接丢失之前未被确认,则RabbitMQ将重新发送消息。你可以从RabbitMQ doc获得更多细节。

在您的情况下,当套接字连接断开时,您应该强制关闭通道,然后RabbitMQ将知道这些消息应该重新发送。

要确保在服务器重新启动时消息能够存活,请设置exchange'duper':ch.assertExchange(exchangeName, 'direct', { durable: false });

© www.soinside.com 2019 - 2024. All rights reserved.