我是 RabbitMQ 的新手,想知道解决我正在考虑的这个问题的好方法。 我想创建一个订阅队列并仅提取满足特定条件的消息的服务;例如,如果消息中包含特定主题标头。
我仍在学习 RabbitMQ,并正在寻找有关如何解决此问题的技巧。 我的问题包括:消费者如何从队列中仅提取特定消息? 生产者如何在消息中设置主题标头(如果这是正确的术语?)
RabbitMQ 非常适合这种情况。您有多种选择可以做您想做的事。我建议阅读文档以更好地理解。我建议你使用主题或直接交换。话题比较灵活。 事情是这样的。
生产者代码连接到 RabbitMQ Broker 并使用特定名称进行创建和交换。
生产者发布交换。每条发布的消息都将使用路由密钥发布。
消费者连接到 RabbitMQ 代理。
消费者创建队列
消费者将队列绑定到交换器,与生产者中定义的交换器相同。 绑定还包括该特定消费者所需的每条消息的路由键。
假设您正在发布日志消息。路由键可能类似于“log.info”、“log.warn”、“log.error”。 生产者发布的每条消息都将附加相关的路由密钥。 然后,您将有一个消费者发送所有错误消息并通过电子邮件发送,另一个消费者将所有错误消息写入文件。 因此,电子邮件发送器将使用路由键“log.error”定义从其队列到交换器的绑定。 这样,虽然交换器收到所有消息,但为电子邮件发送者定义的队列将仅包含错误消息。 文件记录器将定义一个绑定到同一交换的新的单独队列,并设置不同的路由键。 您可以为三个不同的路由键需要进行三个单独的绑定,或者仅使用通配符“log.*”来请求来自以 log 开头的交换的所有消息。
这是一个简单的示例,展示了如何实现您想要做的事情。
查看 here 获取代码示例,特别是第 5 号教程。
建议充分利用rabbitmq的交换/路由。如果你确实想根据消息内容进行检查,下面的代码是一个可行的解决方案。
从队列中检索消息并检查,有选择地确认您感兴趣的消息。
拉一条消息
GetResponse resp = channel.basicGet(QUEUE_NAME, false);
确认一条消息
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
示例
import com.rabbitmq.client.*;
public class ReceiveLogs {
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception {
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
try(Connection connection = factory.newConnection();
Channel channel = connection.createChannel();){
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
// pull one message and ack manually and exit
GetResponse resp = channel.basicGet(QUEUE_NAME, false);
if( resp != null ){
String message = new String(resp.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
channel.basicAck(resp.getEnvelope().getDeliveryTag(), false);
}
System.out.println();
}
}
}
依赖
compile group: 'com.rabbitmq', name: 'amqp-client', version: '5.8.0'
要从 RabbitMQ 检索消息,我们需要首先连接 RabbitMQ 服务器
public WebClient GetRabbitMqConnection(string userName, string password)
{
var client = new WebClient();
client.Credentials = new NetworkCredential(userName, password);
return client;
}
现在使用以下代码从 RabbitMQ 检索消息。
public string GetRabbitMQMessages(string domainName, string port,
string queueName, string virtualHost, WebClient client, string methodType)
{
string messageResult = string.Empty;
string strUri = "http://" + domainName + ":" + port +
"/api/queues/" + virtualHost + "/";
var data = client.DownloadString(strUri + queueName + "/");
var queueInfo = JsonConvert.DeserializeObject<QueueInfo>(data);
if (queueInfo == null || queueInfo.messages == 0)
return string.Empty;
if (methodType == "POST")
{
string postbody = "
{\"ackmode\":\"ack_requeue_true\",\"count\":
\"$totalMessageCount\",\"name\":\"${DomainName}\",
\"requeue\":\"false\",\"encoding\":\"auto\",\"vhost\" :
\"${QueueName}\"}";
postbody = postbody
.Replace("$totalMessageCount", queueInfo.messages.ToString())
.Replace("${DomainName}", domainName)
.Replace("${QueueName}", queueName);
messageResult = client.UploadString(strUri + queueName +
"/get", "POST", postbody);
}
return messageResult;
}
我认为这将帮助您实现 RabbitMQ。
如果您想一次检索一条消息,请使用您的检索代码添加以下属性。
Boolean autoAck = false;
model.BasicConsume(Queuename, autoAck);
model.BasicGet("Queuename", false);
model.BasicGet("Queuename", false);
通过添加 RabbitMQ 的此属性,您可以从队列中一条一条地检索消息。与 FIFO 标准相同