RabbitMQ客户端如何判断何时失去与服务器的连接?

问题描述 投票:10回答:2

如果我连接到RabbitMQ并使用EventingBasicConsumer监听事件,我如何判断是否已与服务器断开连接?

我知道有一个Shutdown事件,但如果我拔掉网线来模拟故障,它就不会启动。

我也试过ModelShutdown事件,和模型上的CallbackException,但似乎都没有用。

RabbitMQ中还内置了HeartBeat功能。服务器在配置文件中指定了它。它的默认值是10分钟,但当然你可以更改。

客户端也可以通过在ConnectionFactory实例上设置RequestedHeartbeat值来请求不同的心跳间隔。

rabbitmq
2个回答
6
投票

我猜你使用的是C#库?但即使如此,我认为其他的库也有类似的事件)。

你可以做以下工作。

public class MyRabbitConsumer
{
  private IConnection connection;

  public void Connect()
  {
    connection = CreateAndOpenConnection();
    connection.ConnectionShutdown += connection_ConnectionShutdown;
  }

  public IConnection CreateAndOpenConnection() { ... }

  private void connection_ConnectionShutdown(IConnection connection, ShutdownEventArgs reason)
  {

  }
}

0
投票

这是个例子,但标明的答案让我想到了这个。

var factory = new ConnectionFactory
{
    HostName = "MY_HOST_NAME",
    UserName = "USERNAME",
    Password = "PASSWORD",
    RequestedHeartbeat = 30
};

using (var connection = factory.CreateConnection())
{
    connection.ConnectionShutdown += (o, e) =>
    {                       
        //handle disconnect                            
    };

    using (var model = connection.CreateModel())
    {
        model.ExchangeDeclare(EXCHANGE_NAME, "topic");
        var queueName = model.QueueDeclare();

        model.QueueBind(queueName, EXCHANGE_NAME, "#"); 

        var consumer = new QueueingBasicConsumer(model);
        model.BasicConsume(queueName, true, consumer);

        while (!stop)
        {
            BasicDeliverEventArgs args;                       
            consumer.Queue.Dequeue(5000, out args);

            if (stop) return;

            if (args == null) continue;
            if (args.Body.Length == 0) continue;

            Task.Factory.StartNew(() =>
            {
                //Do work here on different thread then this one
            }, TaskCreationOptions.PreferFairness);
        }
    }
}

关于这个要注意的几件事。

我用#来做题目。这抓住了一切。通常你要用一个主题来限制。

我设置了一个名为 "stop "的变量来决定进程何时结束。你会注意到循环永远运行,直到该变量为真。

如果没有新消息,Dequeue会等待5秒,然后不获取数据就离开。这是为了确保我们监听到那个停止变量,并在某个时刻真正退出。根据自己的喜好来改变这个值。

当有消息进来时,我会在一个新的线程上生成处理代码。当前的线程被保留用于监听rabbitmq消息,如果一个处理程序处理时间太长,我不希望它拖慢其他消息。你可能需要也可能不需要,这取决于你的实现。然而要小心编写处理消息的代码。如果它需要一分钟的时间来运行,而你收到的消息是在亚秒级的时间,你就会耗尽内存,或者至少会有严重的性能问题。

© www.soinside.com 2019 - 2024. All rights reserved.