RabbitMQ.Client消费并发送到另一个队列

问题描述 投票:0回答:4

我正在尝试使用 RabbitMQ.Client 库在 C# 中使用 RabbitMQ 消息并获取消息列表并在返回数组之前将其添加到数组中。在此之后,我希望处理这个数组并将它发送到一个新队列。所以我的第一种方法如下:

public async Task<List<string>> ProcessMessages() 
{
var factory = new ConnectionFactory { HostName = "local"};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();

channel.QueueDeclare(queue: "myqueue");

var consumer = new EventingBasicConsumer(channel);

var messages= new List<string>();

consumer.Received += async (model, ea) =>
{
    var body = ea.Body.ToString();

    // Do something with the to the body...

    messages.Add(body);

};

channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

return messages;
}

返回的消息数组似乎总是空的。谁能帮我理解为什么?

我尝试的另一种方法是在现有的 Received 处理程序中执行此操作:

public async Task ProcessMessages()
{
    var factory = new ConnectionFactory { HostName = "local" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "myqueue");

    var consumer = new EventingBasicConsumer(channel);

    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToString();
        using var channel = connection.CreateModel();

        channel.QueueDeclare(queue: "NewQueue1",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: null);


        var newMsgBytes[] = new Byte[] // Do some processing of message and send new  message to new queue

            channel.BasicPublish(exchange: string.Empty,
                    routingKey: "NewQueue1",
                    basicProperties: null,
                    body: newMsgBytes);

    };

    channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

}

这似乎也不起作用,因为新消息没有发送到队列。有什么建议吗?谢谢

c# rabbitmq rabbitmq.client
4个回答
0
投票

尝试以这种方式阅读(而不是

ea.Body.ToString()
,使用
ea.Body.ToArray()

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var body = ea.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($" [x] Received {message}");
};

更多细节在这里https://www.rabbitmq.com/tutorials/tutorial-one-dotnet.html


0
投票

在您的第一个示例中,您没有向

messages
列表(正在返回)添加任何内容,只有
receivedQMessages
。除非您在其他地方修改
messages
,否则这可能就是为什么您没有在“队列中”看到任何东西的原因。

其次,请注意,如果您将

autoAck
设置为true,则消息将在您收到后被删除。


0
投票

你应该将 eventArgs 主体转换为数组 这是简单的消费者:

var factory = new ConnectionFactory {
HostName = "localhost"
};
//Create the RabbitMQ connection using connection factory details 
var connection = factory.CreateConnection();

var channel = connection.CreateModel();
//declare the queue after mentioning name and a few property related to that
channel.QueueDeclare("myqueue", exclusive: false);
//Set Event object which listen message from chanel which is sent by producer
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, eventArgs) => {
    var body = eventArgs.Body.ToArray();
    var message = Encoding.UTF8.GetString(body);
    Console.WriteLine($ "myqueue message received: {message}");
};
//read the message
channel.BasicConsume(queue: "myqueue", autoAck: true, consumer: consumer);

0
投票

在第一种方法中:

Received
事件是异步的,在单独的线程上运行。这意味着您的代码将在
Received
事件完成将消息添加到
messages
列表之前继续执行。因此,
messages
列表在返回时将始终为空。
使用像
ConcurrentQueue<T>
这样的线程安全集合来存储消息。
像这样:

public async Task<List<string>> ProcessMessages() 
{
    var factory = new ConnectionFactory { HostName = "local" };
    using var connection = factory.CreateConnection();
    using var channel = connection.CreateModel();

    channel.QueueDeclare(queue: "myqueue");

    var messages = new ConcurrentQueue<string>();

    var consumer = new EventingBasicConsumer(channel);

    consumer.Received += (model, ea) =>
    {
        var body = ea.Body.ToArray();
        var message = Encoding.UTF8.GetString(body);

        // Do something with the message...

        messages.Enqueue(message);
    };

    channel.BasicConsume(queue: "myqueue", autoAck: acknowledge, consumer: consumer);

    return messages.ToList();
}
© www.soinside.com 2019 - 2024. All rights reserved.