使用rabbitmq的Spring Boot故障转移重试器:无法调用目标方法

问题描述 投票:0回答:1

我实际上是在尝试使用Rabbit Mq实现故障转移重试器功能。

因此,要求我从队列中获取一条消息并进行处理。如果发生任何问题,则该消息需要重试3次。

任何人都可以帮助我解决此问题吗?

这是我的代码

POJO

package com.poc.failoverretryer.model;

import java.io.Serializable;

public class User implements Serializable {

    private String firstname;
    private String lastname;
    private String dob;

    public User(String firstname, String lastname, String dob) {
        this.firstname = firstname;
        this.lastname = lastname;
        this.dob = dob;
    }

    public String getFirstname() {
        return firstname;
    }

    public void setFirstname(String firstname) {
        this.firstname = firstname;
    }

    public String getLastname() {
        return lastname;
    }

    public void setLastname(String lastname) {
        this.lastname = lastname;
    }

    public String getDob() {
        return dob;
    }

    public void setDob(String dob) {
        this.dob = dob;
    }

    @Override
    public String toString() {
        return "User{" +
                "firstname='" + firstname + '\'' +
                ", lastname='" + lastname + '\'' +
                ", dob='" + dob + '\'' +
                '}';
    }
}

配置文件

package com.poc.failoverretryer.config;

import com.poc.failoverretryer.model.User;
import com.poc.failoverretryer.receiver.Receiver;
import com.poc.failoverretryer.receiver.ReceiverTwo;
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer;
import org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter;
import org.springframework.amqp.support.converter.Jackson2JsonMessageConverter;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;

import java.io.Serializable;

@Configuration
public class RMQConfiguration implements Serializable {

    @Value("${data.incoming.x}")
    String EXCHANGE_NAME;
    @Value("${data.dl.x}")
    String DEAD_LETTER_EXCHANGE_NAME;
    @Value("${incoming.queue}")
    String INCOMING_QUEUE_NAME;
    @Value("${data.incoming.two}")
    String INCOMING_QUEUE_TWO_NAME;
    @Value("${data.dl}")
    String DEAD_LETTER_QUEUE_NAME;
    @Value("${data.dl.two}")
    String DEAD_LETTER_QUEUE_TWO_NAME;
    @Value("${data.parking}")
    String PARKING_LOT_QUEUE_NAME;
    @Value("${retry.delay:3000}")
    public long DELAYED_RETRY_TIME;
    @Value("${max.retry:3}")
    public int MAX_RETRY_ATTEMPTS;

    @Primary
    @Bean
    Queue queue() {
        return QueueBuilder.durable(INCOMING_QUEUE_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE_NAME)
                .build();
    }

    @Bean
    Queue queueTwo() {
        return QueueBuilder.durable(INCOMING_QUEUE_TWO_NAME)
                .deadLetterExchange(DEAD_LETTER_EXCHANGE_NAME)
                .build();
    }

    @Bean
    Queue deadLetterQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_NAME)
                .deadLetterExchange(EXCHANGE_NAME)
                .withArgument("x-message-ttl", DELAYED_RETRY_TIME)
                .build();
    }

    @Bean
    Queue deadLetterTwoQueue() {
        return QueueBuilder.durable(DEAD_LETTER_QUEUE_TWO_NAME)
                .deadLetterExchange(EXCHANGE_NAME)
                .withArgument("x-message-ttl", DELAYED_RETRY_TIME)
                .build();
    }

    @Bean
    Queue parkingLotQueue() {
        return new Queue(PARKING_LOT_QUEUE_NAME, true);
    }

    @Bean
    DirectExchange exchange() {
        return new DirectExchange(EXCHANGE_NAME);
    }

    @Bean
    DirectExchange deadLetterExchange() {
        return new DirectExchange(DEAD_LETTER_EXCHANGE_NAME);
    }

    @Bean
    Binding binding(@Qualifier("queue") Queue queue, @Qualifier("exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(INCOMING_QUEUE_NAME);
    }

    @Bean
    Binding bindingTwo(@Qualifier("queueTwo") Queue queue, @Qualifier("exchange") DirectExchange exchange) {
        return BindingBuilder.bind(queue).to(exchange).with(INCOMING_QUEUE_TWO_NAME);
    }

    @Bean
    Binding deadLetterBinding(@Qualifier("deadLetterQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange).with(INCOMING_QUEUE_NAME);
    }

    @Bean
    Binding deadLetterTwoBinding(@Qualifier("deadLetterTwoQueue") Queue queue, @Qualifier("deadLetterExchange") DirectExchange exchange) {
        return BindingBuilder
                .bind(queue)
                .to(exchange).with(INCOMING_QUEUE_TWO_NAME);
    }

    @Bean
    @Qualifier("myRabbitListenerContainer")
    SimpleMessageListenerContainer myRabbitListenerContainer(ConnectionFactory connectionFactory, @Qualifier("messageListenerAdapter") MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(INCOMING_QUEUE_NAME);
        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Bean
    @Qualifier("myRabbitListenerTwoContainer")
    SimpleMessageListenerContainer myRabbitListenerTwoContainer(ConnectionFactory connectionFactory, @Qualifier("messageListenerTwoAdapter") MessageListenerAdapter listenerAdapter) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
        container.setConnectionFactory(connectionFactory);
        container.setQueueNames(INCOMING_QUEUE_TWO_NAME);

        container.setMessageListener(listenerAdapter);
        return container;
    }

    @Primary
    @Bean
    MessageListenerAdapter messageListenerAdapter(Receiver receiver) {
        return new MessageListenerAdapter(receiver, "receiveMessage");
    }

    @Bean
    MessageListenerAdapter messageListenerTwoAdapter(ReceiverTwo receiverTwo) {
        return new MessageListenerAdapter(receiverTwo, "receiveMessageTwo");
    }
}

**和听众**

package com.poc.failoverretryer.receiver;

import com.poc.failoverretryer.config.RMQConfiguration;
import com.poc.failoverretryer.model.User;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.AmqpRejectAndDontRequeueException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.amqp.rabbit.annotation.RabbitHandler;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Map;
import java.util.Optional;

@Component
public class Receiver {
    Logger logger = LoggerFactory.getLogger(Receiver.class);
    @Autowired
    private RabbitTemplate rabbitTemplate;
    @Value("${max.retry:3}")
    public int MAX_RETRY_ATTEMPTS;

    @Value("${data.parking}")
    String PARKING_LOT_QUEUE_NAME;

    public void receiveMessage(Message message, @Header(required = false, name = "x-death") List<Map<String, Object>> xDeath) throws InterruptedException {
        System.out.println("Received <" + message + ">");

        logger.info(String.valueOf(xDeath));

        if (message == null) {
            System.out.println("The message is null");
            return;
        }
        try {
            process(message.getBody());
        } catch (Exception e) {
            if (checkIfNeedToRetry(xDeath))
                throw new AmqpRejectAndDontRequeueException("Failed to process message");
            else
                rabbitTemplate.convertAndSend(PARKING_LOT_QUEUE_NAME, message);
        }
    }

    private void process(byte[] message) throws Exception {
        throw new Exception("Test");
    }

    private boolean checkIfNeedToRetry(List<Map<String, Object>> xDeath) {
        long retryCount = 0L;
        if (xDeath != null) {
            Optional<Long> count = xDeath.stream()
                    .flatMap(m -> m.entrySet().stream())
                    .filter(e -> e.getKey().equals("count"))
                    .findFirst().map(e -> (Long) e.getValue());

            retryCount = count.map(Long::longValue).orElse(retryCount);
//            if (count.isPresent()) {
//                retryCount = count.get().longValue();
//                System.out.println("Retry: " + retryCount);
//            } else {
//
//            }
        }

        System.out.println(retryCount);

        if (retryCount <= MAX_RETRY_ATTEMPTS) {
            System.out.println("Retrying");

            return true;
        } else {
            System.out.println("exceed max retry " + retryCount + " -> sending to parking lot");
            return false;
        }
    }
}

运行此代码时,我得到以下错误

org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to invoke target method 'receiveMessage' with argument type = [class com.poc.failoverretryer.model.User], value = [{User{firstname='Jean Elisee', lastname='YAO', dob='13/10/1993'}}]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:405) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:293) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_211]
Caused by: java.lang.NoSuchMethodException: com.poc.failoverretryer.receiver.Receiver.receiveMessage(com.poc.failoverretryer.model.User)
    at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_211]
    at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:184) ~[spring-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:383) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    ... 12 common frames omitted

2020-05-08 02:58:29.900  WARN 17643 --- [enerContainer-1] ingErrorHandler$DefaultExceptionStrategy : Fatal message conversion error; message rejected; it will be dropped or routed to a dead letter exchange, if so configured: (Body:'[B@1a07a33(byte[148])' MessageProperties [headers={}, contentType=application/x-java-serialized-object, contentLength=0, receivedDeliveryMode=PERSISTENT, priority=0, redelivered=false, receivedExchange=, receivedRoutingKey=data.incoming.q, deliveryTag=1, consumerTag=amq.ctag-m3Q0U6oWC1ejy4TrK16WaA, consumerQueue=data.incoming.q])
2020-05-08 02:58:29.901 ERROR 17643 --- [enerContainer-1] o.s.a.r.l.SimpleMessageListenerContainer : Execution of Rabbit message listener failed, and the error handler threw an exception

org.springframework.amqp.AmqpRejectAndDontRequeueException: Error Handler converted exception to fatal
    at org.springframework.amqp.rabbit.listener.ConditionalRejectingErrorHandler.handleError(ConditionalRejectingErrorHandler.java:116) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeErrorHandler(AbstractMessageListenerContainer.java:1383) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.handleListenerException(AbstractMessageListenerContainer.java:1667) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1442) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:963) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:913) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:81) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1284) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1190) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at java.lang.Thread.run(Thread.java:748) [na:1.8.0_211]
Caused by: org.springframework.amqp.rabbit.support.ListenerExecutionFailedException: Failed to invoke target method 'receiveMessage' with argument type = [class com.poc.failoverretryer.model.User], value = [{User{firstname='Jean Elisee', lastname='YAO', dob='13/10/1993'}}]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:405) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.onMessage(MessageListenerAdapter.java:293) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1579) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1498) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1486) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1477) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1421) [spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    ... 6 common frames omitted
Caused by: java.lang.NoSuchMethodException: com.poc.failoverretryer.receiver.Receiver.receiveMessage(com.poc.failoverretryer.model.User)
    at java.lang.Class.getMethod(Class.java:1786) ~[na:1.8.0_211]
    at org.springframework.util.MethodInvoker.prepare(MethodInvoker.java:184) ~[spring-core-5.2.4.RELEASE.jar:5.2.4.RELEASE]
    at org.springframework.amqp.rabbit.listener.adapter.MessageListenerAdapter.invokeListenerMethod(MessageListenerAdapter.java:383) ~[spring-rabbit-2.2.5.RELEASE.jar:2.2.5.RELEASE]
    ... 12 common frames omitted

java spring-boot rabbitmq spring-rabbitmq
1个回答
2
投票
public void receiveMessage(Message message, @Header(required = false, name = "x-death") List<Map<String, Object>> xDeath) throws InterruptedException {

您不能将@Header与旧样式的侦听器适配器一起使用,该适配器只能与@RabbitListener方法一起使用。

反正没有必要;您可以从

获得x-death标头
message.getMessageProperties().getXDeathHeader();

https://docs.spring.io/spring-amqp/api/org/springframework/amqp/core/MessageProperties.html#getXDeathHeader--

© www.soinside.com 2019 - 2024. All rights reserved.