我正在尝试使用 RabbitMQ.Client 库在 C# 中使用 RabbitMQ 消息并获取消息列表并在返回数组之前将其添加到数组中。在此之后,我希望处理这个数组并将它发送到一个新队列。所以我的第一种方法如下:
public async Task<List<string>> ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var consumer = new EventingBasicConsumer(channel);
var messages= new List<string>();
consumer.Received += async (model, ea) =>
{
var body = ea.Body.ToString();
// Do something with the to the body...
messages.Add(body);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
return messages;
}
返回的消息数组似乎总是空的。谁能帮我理解为什么?
我尝试的另一种方法是在现有的 Received 处理程序中执行此操作:
public async Task ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToString();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "NewQueue1",
durable: true,
exclusive: false,
autoDelete: false,
arguments: null);
var newMsgBytes[] = new Byte[] // Do some processing of message and send new message to new queue
channel.BasicPublish(exchange: string.Empty,
routingKey: "NewQueue1",
basicProperties: null,
body: newMsgBytes);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
}
这似乎也不起作用,因为新消息没有发送到队列。有什么建议吗?谢谢
尝试以这种方式阅读(而不是
ea.Body.ToString()
,使用ea.Body.ToArray()
)
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($" [x] Received {message}");
};
更多细节在这里https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html
在您的第一个示例中,您没有向
messages
列表(正在返回)添加任何内容,只有receivedQMessages
。除非您在其他地方修改messages
,否则这可能就是为什么您没有在“队列中”看到任何东西的原因。
其次,请注意,如果您将
autoAck
设置为true,则消息将在您收到后被删除。
你应该将 eventArgs 主体转换为数组 这是简单的消费者:
var factory = new ConnectionFactory {
HostName = "localhost"
};
//Create the RabbitMQ connection using connection factory details
var connection = factory.CreateConnection();
var channel = connection.CreateModel();
//declare the queue after mentioning name and a few property related to that
channel.QueueDeclare("myqueue", exclusive: false);
//Set Event object which listen message from chanel which is sent by producer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) => {
var body = eventArgs.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
Console.WriteLine($ "myqueue message received: {message}");
};
//read the message
channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer);
在第一种方法中:
Received
事件是异步的,在单独的线程上运行。这意味着您的代码将在 Received
事件完成将消息添加到 messages
列表之前继续执行。因此,messages
列表在返回时将始终为空。ConcurrentQueue<T>
这样的线程安全集合来存储消息。 public async Task<List<string>> ProcessMessages()
{
var factory = new ConnectionFactory { HostName = "local" };
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "myqueue");
var messages = new ConcurrentQueue<string>();
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
var message = Encoding.UTF8.GetString(body);
// Do something with the message...
messages.Enqueue(message);
};
channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);
return messages.ToList();
}