RabbitMQ接收消息

问题描述 投票:0回答:1

我开始使用 RabbitMQ,并且我创建了以下代码作为教程一训练。

成功发送

message
后,我无法收到回信。

如果在调试时我在每个步骤等待几分钟,则会显示

message
,因此问题似乎是在某处缺少
await
(可能在
consumer.Received
附近)。我缺少什么?据我了解,
EventingBasicConsumer
应该处理这种异步通信。

public ActionResult TestSendMessage(string message)
{
    try
    {
        var factory = new ConnectionFactory
        {
            HostName = _HostName,
            Port = _Port,
            UserName = _UserName,
            Password = _Password,
        };

        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: "hello",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        var body = Encoding.UTF8.GetBytes(message);
        channel.BasicPublish(exchange: string.Empty,
            routingKey: "hello",
            basicProperties: null,
            body: body);

        return Json(new
        {
            success = true,
            responseText = "Sent correctly"
        }, JsonRequestBehavior.AllowGet);

    }
    catch (Exception ex)
    {
        return Json(new
        {
            success = false,
            responseText = "Error: " + ex.Message
        }, JsonRequestBehavior.AllowGet);
    }
}

public ActionResult TestReceiveMessage()
{
    try
    {
        var factory = new ConnectionFactory
        {
            HostName = _HostName,
            Port = _Port,
            UserName = _UserName,
            Password = _Password,
        };

        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: "hello",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        string receivedMessage = null;

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            receivedMessage = Encoding.UTF8.GetString(body);
        };

        channel.BasicConsume(queue: "hello",
            autoAck: true,
            consumer: consumer);

        return Json(new
        {
            success = true,
            responseText = receivedMessage
        }, JsonRequestBehavior.AllowGet);
    }
    catch (Exception ex)
    {
        return Json(new
        {
            success = false,
            responseText = "Error: " + ex.Message
        }, JsonRequestBehavior.AllowGet);
    }
}
rabbitmq .net-4.8
1个回答
0
投票

问题似乎是教程指的是

localhost
连接,在我的例子中,我已更改为外部VM服务器。

我添加了一个

TaskCompletionSource
任务,如下所示,然后我可以
await Task.WhenAny
进行超时比较。

另外,我确实注意到

RabbitMQ
官方论坛是一个Google Group,如果有人感兴趣的话。

public async Task<ActionResult> TestReceiveMessage()
{
    try
    {
        var factory = new ConnectionFactory
        {
            HostName = _HostName,
            Port = _Port,
            UserName = _UserName,
            Password = _Password,
        };

        using var connection = factory.CreateConnection();
        using var channel = connection.CreateModel();
        channel.QueueDeclare(queue: "hello",
            durable: false,
            exclusive: false,
            autoDelete: false,
            arguments: null);

        var consumer = new EventingBasicConsumer(channel);
        string receivedMessage = null;

        //Aux: check if message was received
        var messageReceived = new TaskCompletionSource<bool>();

        consumer.Received += (model, ea) =>
        {
            var body = ea.Body.ToArray();
            receivedMessage = Encoding.UTF8.GetString(body);

            //Aux: confirm message received
            messageReceived.SetResult(true);
        };

        channel.BasicConsume(queue: "hello",
            autoAck: true,
            consumer: consumer);

        //Await message or timeout
        var timeout = 30; //Seconds
        await Task.WhenAny(
            messageReceived.Task,
            Task.Delay(TimeSpan.FromSeconds(timeout)
            ));

        if (!messageReceived.Task.IsCompleted)
        {
            return Json(new
            {
                success = false,
                responseText = "Timeout",
            }, JsonRequestBehavior.AllowGet);
        }

        return Json(new
        {
            success = true,
            responseText = receivedMessage
        }, JsonRequestBehavior.AllowGet);
    }
    catch (Exception ex)
    {
        return Json(new
        {
            success = false,
            responseText = "Error: " + ex.Message
        }, JsonRequestBehavior.AllowGet);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.