使用topic交换,我希望具有以下功能的发布/订阅消息传递模式:
因此,我创建了2个控制台应用程序(发送和接收)以测试以上内容。
发送
static void Main(string[] args) { Console.WriteLine(" Type [exit] to exit."); Publisher publisher = new Publisher(); do { var userInput = Console.ReadLine(); if (userInput == "exit") { break; } publisher.SendMessageToBroker("localhost", "main", "user.update", userInput); } while (true); }
Publisher
public class Publisher { const string ExchangeType = "topic"; Dictionary<ulong, string> unConfirmedMessageTags = new Dictionary<ulong, string>(); public void SendMessageToBroker(string host, string exchangeName, string routingKey, string message) { var factory = new ConnectionFactory() { HostName = host }; using (var connection = factory.CreateConnection()) using (var channel = connection.CreateModel()) { channel.BasicAcks += (sender, ea) => OnBasicAcks(ea.Multiple, ea.DeliveryTag); channel.BasicNacks += (sender, ea) => OnBasicNacks(ea.Multiple, ea.DeliveryTag); channel.ConfirmSelect(); channel.ExchangeDeclare(exchangeName, ExchangeType); var body = Encoding.UTF8.GetBytes(message); var properties = channel.CreateBasicProperties(); properties.Persistent = true; unConfirmedMessageTags.TryAdd(channel.NextPublishSeqNo, message); channel.BasicPublish(exchange: exchangeName, routingKey: routingKey, basicProperties: properties, body: body); Console.WriteLine(" [x] Sent {0}", message); } } private void OnBasicNacks(bool multiple, ulong deliveryTag) { if (multiple) { Console.WriteLine("Messages with delivery tag LESS THAN {0} have been LOST and must be resent.", deliveryTag); } else { Console.WriteLine("Message with delivery tag {0} has been LOST and must be resent.", deliveryTag); } } private void OnBasicAcks(bool multiple, ulong deliveryTag) { if (multiple) { var confirmed = unConfirmedMessageTags.Where(k => k.Key <= deliveryTag); foreach (var entry in confirmed) { unConfirmedMessageTags.Remove(entry.Key); Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", entry.Key); } } else { unConfirmedMessageTags.Remove(deliveryTag); Console.WriteLine("Message with delivery tag {0} has been confirmed and deleted.", deliveryTag); } } }
}
接收
程序中的OnBasicAcks仅针对第一条消息被调用一次。static void Main(string[] args) { const string ExchangeName = "main"; const string QueueName = "q1"; const string ExchangeType = "topic"; const string RoutingKey = "user.update"; var factory = new ConnectionFactory() { HostName = "localhost" }; using(var connection = factory.CreateConnection()) using(var channel = connection.CreateModel()) { channel.ExchangeDeclare(ExchangeName, ExchangeType); channel.QueueDeclare(queue: QueueName, durable: true, autoDelete: false, exclusive: false, arguments: null); channel.QueueBind(QueueName, ExchangeName, RoutingKey); //channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false); var consumer = new EventingBasicConsumer(channel); consumer.Received += (model, ea) => Basic_Ack(channel, ea.DeliveryTag, ea.Body); channel.BasicConsume(queue: QueueName, autoAck: false, consumer: consumer); Console.WriteLine(" Press [enter] to exit."); Console.ReadLine(); } } private static void Basic_Ack(IModel channel, ulong deliveryTag, ReadOnlyMemory<byte> body) { var message = Encoding.UTF8.GetString(body.ToArray()); Console.WriteLine(" [x] Received {0}", message); Thread.Sleep(2000); channel.BasicAck(deliveryTag: deliveryTag, multiple: false); Console.WriteLine(" [x] Processed {0}", message); } }
问题是我的Send
[使用主题交换,我希望具有以下功能的发布/订阅消息传递模式:实施“发布者确认”。让消费者将每个消息确认为...
对于其他可能遇到此问题的人,我正在为每个发布discouraged打开一个连接和通道(虚拟连接):