我开始使用 RabbitMQ,并且我创建了以下代码作为教程一训练。
成功发送
message
后,我无法收到回信。
如果在调试时我在每个步骤等待几分钟,则会显示
message
,因此问题似乎是在某处缺少 await
(可能在 consumer.Received
附近)。我缺少什么?据我了解,EventingBasicConsumer
应该处理这种异步通信。
public ActionResult TestSendMessage(string message)
{
try
{
var factory = new ConnectionFactory
{
HostName = _HostName,
Port = _Port,
UserName = _UserName,
Password = _Password,
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var body = Encoding.UTF8.GetBytes(message);
channel.BasicPublish(exchange: string.Empty,
routingKey: "hello",
basicProperties: null,
body: body);
return Json(new
{
success = true,
responseText = "Sent correctly"
}, JsonRequestBehavior.AllowGet);
}
catch (Exception ex)
{
return Json(new
{
success = false,
responseText = "Error: " + ex.Message
}, JsonRequestBehavior.AllowGet);
}
}
public ActionResult TestReceiveMessage()
{
try
{
var factory = new ConnectionFactory
{
HostName = _HostName,
Port = _Port,
UserName = _UserName,
Password = _Password,
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
string receivedMessage = null;
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
receivedMessage = Encoding.UTF8.GetString(body);
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
return Json(new
{
success = true,
responseText = receivedMessage
}, JsonRequestBehavior.AllowGet);
}
catch (Exception ex)
{
return Json(new
{
success = false,
responseText = "Error: " + ex.Message
}, JsonRequestBehavior.AllowGet);
}
}
问题似乎是教程指的是
localhost
连接,在我的例子中,我已更改为外部VM服务器。
我添加了一个
TaskCompletionSource
任务,如下所示,然后我可以await Task.WhenAny
进行超时比较。
另外,我确实注意到
RabbitMQ
官方论坛是一个Google Group,如果有人感兴趣的话。
public async Task<ActionResult> TestReceiveMessage()
{
try
{
var factory = new ConnectionFactory
{
HostName = _HostName,
Port = _Port,
UserName = _UserName,
Password = _Password,
};
using var connection = factory.CreateConnection();
using var channel = connection.CreateModel();
channel.QueueDeclare(queue: "hello",
durable: false,
exclusive: false,
autoDelete: false,
arguments: null);
var consumer = new EventingBasicConsumer(channel);
string receivedMessage = null;
//Aux: check if message was received
var messageReceived = new TaskCompletionSource<bool>();
consumer.Received += (model, ea) =>
{
var body = ea.Body.ToArray();
receivedMessage = Encoding.UTF8.GetString(body);
//Aux: confirm message received
messageReceived.SetResult(true);
};
channel.BasicConsume(queue: "hello",
autoAck: true,
consumer: consumer);
//Await message or timeout
var timeout = 30; //Seconds
await Task.WhenAny(
messageReceived.Task,
Task.Delay(TimeSpan.FromSeconds(timeout)
));
if (!messageReceived.Task.IsCompleted)
{
return Json(new
{
success = false,
responseText = "Timeout",
}, JsonRequestBehavior.AllowGet);
}
return Json(new
{
success = true,
responseText = receivedMessage
}, JsonRequestBehavior.AllowGet);
}
catch (Exception ex)
{
return Json(new
{
success = false,
responseText = "Error: " + ex.Message
}, JsonRequestBehavior.AllowGet);
}
}