侦听器在重新建立与主机的连接后不会提取数据

问题描述 投票:0回答:1

让我说我刚刚开始涉足AMQP。

我想从队列中消费/提取数据。我使用Spring的库(spring-boot-starter-amqp)来使事情变得更容易。我有一个侦听器类,其方法在设置队列的地方带有@RabbitListener注释。其他所有内容都是通过属性配置的:

    rabbitmq:
      username: user
      password: password
      virtual-host: virtual-host
      port: 5672
      host: host
      queue: _316_
      listener:
        simple:
          retry:
            enabled: true
            initial-interval: 1000
            max-attempts: 8
            max-interval: 10000
            multiplier: 2.0
            stateless: true

一切正常,直到我让主机暂时无法使用为止。发生这种情况时,将断开连接并尝试重新建立连接。重新建立连接后,侦听器不会开始提取消息。重新启动应用程序后,一切都很好,但是我确信可以以某种方式配置它以使消费者不断重新启动,至少它应该在重新建立连接后尝试这样做(或者至少这是我期望的)。

断开连接后,可以在日志中找到以下内容:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-PgBSeymWBfsghwdUYr5asA (_316_); Consumer@22ead351: tags=[[amq.ctag-PgBSeymWBfsghwdUYr5asA]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer WARN Consumer raised exception, processing can restart if the connection factory supports it. Exception summary: org.springframework.amqp.rabbit.support.ConsumerCancelledException

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@22ead351: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@39549f33 Shared Rabbit Connection: SimpleConnection@6f731759 [delegate=amqp://user@host, localPort= 36678], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host:5672]

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.start(BlockingQueueConsumer.java:564)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.initialize(SimpleMessageListenerContainer.java:1201)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1046)\n\tat java.base/java.lang.Thread.run(Thread.java:835)\nCaused by: org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer","message":"Consumer received fatal exception on startup

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Waiting for workers to finish.

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Successfully waited for workers to finish.

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat 

然后尝试进行连接,我们处于循环中:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.AmqpAuthenticationException: com.rabbitmq.client.AuthenticationFailureException: ACCESS_REFUSED 

直到重新建立连接:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Attempting to connect to: [host]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory INFO Created new connection: rabbitConnectionFactory#69d3cf7e:16/SimpleConnection@3931e0ad [delegate=amqp://user@host, localPort= 50574]

并且什么也没有发生-没有消息被占用。

UPDATE:遵循了建议,并打开了DEBUG日志记录。

启动应用程序时,我们是:

  1. 启动侦听器容器
org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Starting Rabbit listener container.
  1. 创建连接

  2. 起始消费者

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@3daf03d8: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0
  1. 创建频道并开始消费
org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Creating cached Rabbit Channel from AMQChannel(amqp://user@host,1)

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG ConsumeOK: Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Started on queue '_316_' with tag amq.ctag-uG8_iXcNaknFjBIGM-91Tg: Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Storing delivery for consumerTag: 'amq.ctag-uG8_iXcNaknFjBIGM-91Tg' with deliveryTag: '1' in Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Received message: (Body:'[B@4b817fae(byte[117])' MessageProperties [headers={}, contentLength=0, redelivered=true, receivedExchange=, receivedRoutingKey=_316_, deliveryTag=1, consumerTag=amq.ctag-uG8_iXcNaknFjBIGM-91Tg, consumerQueue=_316_])

这种情况一直持续到连接断开:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer WARN Cancel received for amq.ctag-uG8_iXcNaknFjBIGM-91Tg (_316_); Consumer@3daf03d8: tags=[[amq.ctag-uG8_iXcNaknFjBIGM-91Tg]], channel=Cached Rabbit Channel: AMQChannel(amqp:///user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp:///user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

通道已关闭,并且用户以某种方式被删除,如日志所述:

org.springframework.amqp.rabbit.connection.CachingConnectionFactory ERROR Channel shutdown: connection error; protocol method: #method<connection.close>(reply-code=320, reply-text=CONNECTION_FORCED - user 'user' is deleted, class-id=0, method-id=0)

与连接驱动程序有关的问题,然后引发异常:

com.rabbitmq.client.impl.ForgivingExceptionHandler WARN An unexpected connection driver error occured (Exception message: Connection reset)

org.springframework.amqp.rabbit.support.ConsumerCancelledException: null\n\tat org.springframework.amqp.rabbit.listener.BlockingQueueConsumer.nextMessage(BlockingQueueConsumer.java:499)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:859)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1142)\n\tat org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1048)\n\tat java.base/java.lang.Thread.run(Thread.java:835

消费者提出了例外,并说处理可以使-r-s-t-a-r-t

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Consumer raised exception, processing can restart if the connection factory supports it

发生重新整理:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer INFO Restarting Consumer@3daf03d8: tags=[[]], channel=Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906], acknowledgeMode=AUTO local queue size=0

频道正在关闭:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Closing Rabbit Channel: Cached Rabbit Channel: AMQChannel(amqp://user@host,1), conn: Proxy@437bd805 Shared Rabbit Connection: SimpleConnection@49fdbe2b [delegate=amqp://user@host, localPort= 37906]

org.springframework.amqp.rabbit.connection.CachingConnectionFactory DEBUG Closing cached Channel: AMQChannel(amqp://user@host,1)

新消费者开始了:

org.springframework.amqp.rabbit.listener.BlockingQueueConsumer DEBUG Starting consumer Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

我们正在尝试连接,最终导致WARN和AUTHENTICATION失败,(因为以前的日志说该用户已删除?):

An unexpected connection driver error occured (Exception message: Socket closed)

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException: Authentication failure\n\tat

ACCESS_REFUSED - Login was refused using authentication mechanism PLAIN. For details see the broker logfile.\n\tat org.springframework.amqp.rabbit.support.RabbitExceptionTranslator.convertRabbitAccessException(RabbitExceptionTranslator.java:65)\n\tat

尝试开始的消费者:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Consumer received fatal exception on startup

并且它(消费者)被取消:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Cancelling Consumer@2560313a: tags=[[]], channel=null, acknowledgeMode=AUTO local queue size=0

通道已关闭,容器正在停止:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer ERROR Stopping container from aborted consumer

然后容器正在关闭:

org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer DEBUG Shutting down Rabbit listener container

我们正在等待工作人员完成,它成功,然后我们尝试再次连接,相同的SOCKET_CLOSED正在被一遍又一遍地记录。

然后将主机恢复并重新建立连接。正在创建缓存的Rabit Channel,并且没有任何反应。

我以为问题是容器被关闭并且再也没有恢复生命,因此没有消费者。

工作原理

我创建了一个具有“ listenning”方法的类,该方法接受ListenerContainerConsumerFailedEvent。该类具有RabbitListenerEndpointRegistry(Boot为我方便地创建的bean),并且每当调用该方法时,我都会检查listenerContainer是否正在运行(如果没有运行的话),我会启动它(这种检查很可能是多余的)。

@EventListener
public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
        var listenerContainer = rabbitListenerEndpointRegistry.getListenerContainer(MessageListener.RABBIT_LISTENER_ID);
        if (!listenerContainer.isRunning()){
            listenerContainer.start();
        }
    }
java spring-boot rabbitmq spring-rabbitmq
1个回答
2
投票

org.springframework.amqp.rabbit.listener.exception.FatalListenerStartupException:身份验证失败

FatalListenerStartupException

认证失败被认为是致命的,容器立即停止;这种情况不太可能会自动纠正。

删除当前正在使用的用户是一种非常不寻常的情况。

您可以使用ApplicationListener bean或@EventListener方法来侦听ListenerContainerConsumerTerminatedEvent,并在一段时间后尝试重新启动容器。

© www.soinside.com 2019 - 2024. All rights reserved.