如何为以下场景(在.Net中设置RabbitMQ客户端/队列/交换?我有一台服务器(叫它为server1
),它正在运行许多服务(service1
,service2
...)和多个客户端(client1
,client2
,clientN
)。我希望将server1
的service1
生成的消息发送给正在侦听service1
消息的所有客户端。
我设法做到了,但我遇到了一个问题-显然,尽管客户端正在侦听交换/队列service1
并启用了AutoACK
时收到了邮件,但邮件并未删除-当下一封邮件到来时,旧邮件到达了再次发生,并且客户端Received
事件的触发次数与以前的消息一样多。
我需要服务器/生产者的最小示例(一个发送消息),以及基于某种路由密钥接收消息的客户端的示例。如果没有客户端在监听该特定的路由密钥,那么没关系,可以丢弃消息。服务器和客户端都预先知道路由密钥,因此特定的客户端“知道”他想从service2
接收消息。
当前设置是这个:
客户端代码(此代码侦听所有消息,这就是路由键为“#”的原因-一些客户端定义了路由键-service1
,依此类推。
servicesChannel = conn.CreateModel();
servicesChannel.ExchangeDeclare("services", ExchangeType.Fanout);
var queueName = servicesChannel.QueueDeclare().QueueName;
servicesChannel.QueueBind(queue: queueName, exchange: "services", routingKey: "#");
var consumer = new EventingBasicConsumer(servicesChannel);
consumer.Received += (model, ea) =>
{
// do bunch of stuff before
log.Trace("Acknowledging message " + ea.DeliveryTag);
// if we uncomment this, no more messages will arrive after 1st!
//servicesChannel.BasicAck(ea.DeliveryTag, false);
}
servicesChannel.BasicConsume(queue: queueName, autoAck: true, consumer: consumer);
和服务器的代码,推送消息:
servicesChannel= conn.CreateModel();
servicesChannel.ExchangeDeclare("services", ExchangeType.Fanout);
...
string message = Newtonsoft.Json.JsonConvert.SerializeObject(data, settings);
var body = Encoding.UTF8.GetBytes(message);
servicesChannel.BasicPublish(exchange: "databasechanges", routingKey: routingKey, basicProperties: null, body: body);
您正在搜索的是主题交流。
除了我对您的评论之外,还会发布到其他交易所。
您的交换声明必须为:
servicesChannel.ExchangeDeclare("services", type: "topic");
阅读更多内容,并在此处查看代码示例:https://www.rabbitmq.com/tutorials/tutorial-five-dotnet.html