RabbitMQ——有选择地从队列中检索消息

问题描述 投票:0回答:4

我是 RabbitMQ 的新手,想知道解决我正在考虑的这个问题的好方法。 我想创建一个订阅队列并仅提取满足特定条件的消息的服务;例如,如果消息中包含特定主题标头。

我仍在学习 RabbitMQ,并正在寻找有关如何解决此问题的技巧。 我的问题包括:消费者如何从队列中仅提取特定消息? 生产者如何在消息中设置主题标头(如果这是正确的术语?)

rabbitmq
4个回答
38
投票

RabbitMQ 非常适合这种情况。您有多种选择可以做您想做的事。我建议阅读文档以更好地理解。我建议你使用主题或直接交换。话题比较灵活。 事情是这样的。

生产者代码连接到 RabbitMQ Broker 并使用特定名称进行创建和交换。

生产者发布交换。每条发布的消息都将使用路由密钥发布。

消费者连接到 RabbitMQ 代理。

消费者创建队列

消费者将队列绑定到交换器,与生产者中定义的交换器相同。 绑定还包括该特定消费者所需的每条消息的路由键。

假设您正在发布日志消息。路由键可能类似于“log.info”、“log.warn”、“log.error”。 生产者发布的每条消息都将附加相关的路由密钥。 然后,您将有一个消费者发送所有错误消息并通过电子邮件发送,另一个消费者将所有错误消息写入文件。 因此,电子邮件发送器将使用路由键“log.error”定义从其队列到交换器的绑定。 这样,虽然交换器收到所有消息,但为电子邮件发送者定义的队列将仅包含错误消息。 文件记录器将定义一个绑定到同一交换的新的单独队列,并设置不同的路由键。 您可以为三个不同的路由键需要进行三个单独的绑定,或者仅使用通配符“log.*”来请求来自以 log 开头的交换的所有消息。

这是一个简单的示例,展示了如何实现您想要做的事情。

查看 here 获取代码示例,特别是第 5 号教程。


3
投票

建议充分利用rabbitmq的交换/路由。如果你确实想根据消息内容进行检查,下面的代码是一个可行的解决方案。

从队列中检索消息并检查,有选择地确认您感兴趣的消息。

拉一条消息

GetResponse resp = channel.basicGet(QUEUE_NAME, false);

确认一条消息

channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);

示例

import com.rabbitmq.client.*;

public class ReceiveLogs {
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws Exception {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        try(Connection connection = factory.newConnection();
            Channel channel = connection.createChannel();){

            channel.queueDeclare(QUEUE_NAME, true, false, false, null);

            // pull one message and ack manually and exit
            GetResponse resp = channel.basicGet(QUEUE_NAME, false);
            if( resp != null ){
                String message = new String(resp.getBody(), "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
                channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
            }
            System.out.println();
        }
    }
}

依赖

compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'

-3
投票

要从 RabbitMQ 检索消息,我们需要首先连接 RabbitMQ 服务器

public WebClient GetRabbitMqConnection(string userName, string password)
{
    var client = new WebClient(); 
    client.Credentials = new NetworkCredential(userName, password);
    return client;
}

现在使用以下代码从 RabbitMQ 检索消息。

public string GetRabbitMQMessages(string domainName, string port, 
    string queueName, string virtualHost, WebClient client, string methodType)
{
    string messageResult = string.Empty;
    string strUri = "http://" + domainName + ":" + port + 
                    "/api/queues/" + virtualHost + "/";
    var data = client.DownloadString(strUri + queueName + "/");
    var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);
    if (queueInfo == null || queueInfo.messages == 0)
               return string.Empty;
    if (methodType == "POST")
    {
        string postbody = "  
        {\"ackmode\":\"ack_requeue_true\",\"count\":
         \"$totalMessageCount\",\"name\":\"${DomainName}\",
         \"requeue\":\"false\",\"encoding\":\"auto\",\"vhost\" :
         \"${QueueName}\"}";
         postbody = postbody
                       .Replace("$totalMessageCount", queueInfo.messages.ToString())
                       .Replace("${DomainName}", domainName)
                       .Replace("${QueueName}", queueName);
         messageResult = client.UploadString(strUri + queueName + 
                          "/get", "POST", postbody);
    }
    return messageResult;
} 

我认为这将帮助您实现 RabbitMQ。


-4
投票

如果您想一次检索一条消息,请使用您的检索代码添加以下属性。

Boolean autoAck = false;
model.BasicConsume(Queuename, autoAck);
model.BasicGet("Queuename", false);
model.BasicGet("Queuename", false); 

通过添加 RabbitMQ 的此属性,您可以从队列中一条一条地检索消息。与 FIFO 标准相同

© www.soinside.com 2019 - 2024. All rights reserved.