RabbitMQ 和消息优先级

问题描述 投票:0回答:8

RabbitMQ有消息优先级的概念吗?我有一个问题,一些更重要的消息由于在队列中排在它们前面的不太重要的消息而被放慢。我想让高优先级的优先级高,移到队列的前面。

我知道我可以使用两个队列来近似这个,一个“快”队列和一个“慢”队列,但这看起来像是一个黑客。

有人知道使用 RabbitMQ 的更好解决方案吗?

rabbitmq
8个回答
91
投票

这个问题的答案已经过时了。从 RabbitMQ 3.5.0 开始,现在有对 AMQP 标准每条消息优先级的内核支持。 文档 包含所有血淋淋的细节,但简而言之:

  • 需要在创建队列的时候定义队列的优先级范围;
  • 没有设置优先级的消息优先级为 0;
  • 数字优先级高于队列上设置的最大优先级的消息将获得队列支持的最高优先级。

更多有趣的警告在文档中。非常值得一读。


20
投票

兔子没有优先级的概念,正如布赖恩简洁地指出的那样,前面的人先到达那里。 ;-)

我建议实施一组队列来满足您的特定消息传递需求,并让这些队列模拟您的优先级需求,例如,称它们为“MyQueueP1”、“MyQueueP2”等等,然后让我们的消费者检查P1 在 P2 之前(等等),然后首先从那里发出服务消息。

如果你有一个高优先级的消息,你可以通过合适的路由键将它发布到适当的优先级队列,瞧。

[更新] 检查这个问题: 在 FIFO 队列系统中,实现优先级消息传递的最佳方式是什么

[更新] 根据最近的 RabbitMQ 版本 3.5.0,这个答案现在已经过时了,应该被认为只对这个版本之前的版本有效。 https://stackoverflow.com/a/29068288/489888


10
投票

IIRC RabbitMQ 仍然使用 AMQP 协议版本 0.9.1(获取规范here)。规范明确提到了消息优先级:

Messages may have a priority level. A high priority message is sent ahead of lower     priority messages
waiting in the same message queue. When messages must be discarded in order to maintain a specific
service quality level the server will first discard low-priority messages.

和:

Note that in the presence of multiple readers from a queue, or client transactions, or use of priority fields,
or use of message selectors, or implementation-specific delivery optimisations the queue MAY NOT
exhibit true FIFO characteristics.

规范说优先级是必须的,所以我想 RabbitMQ 应该实现它,但您可能需要查阅它的文档。


6
投票

是的,RabbitMQ 支持优先级队列。

要使队列作为优先队列工作,请在声明队列时提供属性

x-max-priority

属性

x-max-priority
定义队列支持的最大优先级数。

在Java中,你可以这样做:

Map<String, Object> props = new HashMap<>();
props.put("x-max-priority", 10); // max priority number as 10
channel.queueDeclare(QUEUE_NAME, durable, false, false, props);

要发布特定优先级的消息,请执行以下操作:

String message = "My message with priority 7";
AMQP.BasicProperties.Builder basicProps = new AMQP.BasicProperties.Builder();
basicProps.contentType("text/plain")
            .priority(7);
channel.basicPublish("", QUEUE_NAME, basicProps.build(), message.getBytes());

0
投票

这里有一个 C# 代码示例,它定义了一个具有一系列优先级的队列:

using RabbitMQ.Client;
public void Setup()
{
    ConnectionFactory factory = new() { host = "", username = "", password = "" };
    var connection = factory.CreateConnection();
    var model = connection.CreateModel();
    var args = new Dictionary<string, object>
    {
        { "x-min-priority", 0 },
        { "x-max-priority", 9 }
    };
    model.QueueDeclare("Queue1", arguments: args);
}

这里有一个优先发送消息的函数:

private static void Send(IModel model, string queue, string message, byte priority)
{
    var body = Encoding.UTF8.GetBytes(message);

    var basicProperties = model.CreateBasicProperties();
    basicProperties.Priority = priority;

    model.BasicPublish(exchange: "",
                       routingKey: queue,
                       basicProperties: basicProperties,
                       body: body);
}

正如其他人所提到的,Rabbit 并没有严格遵守这些优先级。

如果您发送的消息超出了为队列定义的范围,它将被视为最小值或最大值。


-1
投票

我们可以通过从 https://www.rabbitmq.com/community-plugins.html 安装插件 rabbitmq_priority_queue 使 rabbitmq 成为分布式优先级队列。您必须下载插件 rabbitmq_priority_queue-3.3.x-72d20292.ez 并将其放入 rabbit mq 安装目录的 plugins 文件夹中。重新启动服务器。现在您可以按优先级将项目插入队列并相应地使用它,已将示例代码粘贴在如何轮询 RabbitMQ 以连续按优先级顺序获取消息?


-5
投票

RabbitMQ / AMQP 肯定有消息优先级的概念 - 队列头部的消息优先于它后面的消息,而那个消息优先于它后面的消息,依此类推,无穷无尽。

你能改变那个模型吗?没有! :)


-12
投票

如果它实现了优先级排序,它就不是 MQ。

MQ 是用于数据的电子邮件。在所有数据传输中,保存顺序是至关重要的。如果你改变顺序,删除发生在插入之前,更新就会乱序。一切正常。

你可能有一个有效的实现,有一些例外,但我发现大多数优先级队列的设计是因为人们对他们的系统架构和其中各部分的交互进行了肤浅的思考。保持事物的顺序几乎总是正确的做法,对于实体内和实体间的交互都是如此。

为了说事件 A 的优先级高于事件 B,这两个事件必须始终解耦。当发生这种情况时,人们想知道为什么它们完全存在于相同的队列结构中。话又说回来,如果它与有效载荷相关,则该有效载荷的计算工作也会影响系统的性能,因此尽早决定,即在使有效载荷有意义之前。

© www.soinside.com 2019 - 2024. All rights reserved.