使用Spring上下文进行的集成测试无法接收RabbitMQ消息。

问题描述 投票:0回答:1

RabbitMQ MessageConsumer没有收到发布的消息

我正在写一个集成测试,它应该从队列中接收消息并进行处理。但消费者根本没有收到消息。

当我手动注入依赖关系时-- 无Spring语境,工作正常。但是当我使用SpringContext时,消费者并没有收到消息。

在我使用SpringContext的时候,消费者没有收到消息。SpringInfraConfig.class 从环境变量中加载值。为了 "emule "环境,我使用了一个类 EnvironmentVariables本馆. 环境变量加载正常--检查运行调试。

注意 当我提到 没有SpringContext也能正常工作我也没有使用环境库。

为了将消息发布到RabbitMQ队列中,我在测试方法上 "手动 "进行。消息发布得很好。在调用真正的测试类之前,我写了一个consming测试代码。这是一个简单的原始消费者,覆盖了 DefaultConsumer#handleDelivery 带着 sysout 来打印收到的信息。工作。

当我使用我真正的测试类-- MessageConsumerServiceImpl.class 它只是记录开始从队列中消耗,测试结束。

当我调试并进入所有方法时,一些非常奇怪的事情发生了--收到消息后,在处理过程中,它没有完成所有的调用就结束了--。停滞不前但也没有抛出任何错误。

另一个奇怪的事情是--启用RabbitMQ管理插件后,没有队列、交换、通道,甚至没有打开连接。我在debug运行时检查了这一点,同时停止到一个断点。

SpringConfig类

@Import({SpringCoreConfig.class})
@ComponentScan({"br.com.fulltime.fullarm.fullcam.integration.infra",     "br.com.fulltime.fullarm.cross.cutting", "br.com.fulltime.fullarm.infrastructure.commons"})
@Configuration
public class SpringInfraConfig {

@Bean
public FInfraSettings getFInfraSettings() {
    Map<String, String> fInfraMap = new HashMap<>();
    fInfraMap.put("F_INFRA_RABBIT_HOST", "f_infra_rabbit_host");
    fInfraMap.put("F_INFRA_EXCHANGE", "f_infra_exchange");
    fInfraMap.put("F_INFRA_QUEUE", "f_infra_queue");
    fInfraMap.put("F_INFRA_PROCESS_ID", "f_infra_process_id");
    fInfraMap.put("F_INFRA_DESCRIPTION", "f_infra_description");
    fInfraMap.put("F_INFRA_TEXT", "f_infra_text");
    fInfraMap.put("F_INFRA_TAG", "f_infra_tag");
    fInfraMap.put("F_INFRA_WARNING_TIME", "f_infra_warning_time");
    fInfraMap.put("F_INFRA_CRITICAL_TIME", "f_infra_critical_time");

    return new FInfraSettings(
            getEnv("f_infra_run", "false").asBoolean(),
            getEnv("f_infra_ka_time", "1").asInt(),
            fInfraMap);
}

@Bean
public ApplicationSettings getApplicationSettings() {
    return new ApplicationSettings(
            getEnv("process_name", "FullArm-FullCam Integration").asString(),
            getEnv("process_version", "DEFAULT-1.0.0").asString());
}

@Bean
public PushoverSettings getPushoverSettings() {
    return new PushoverSettings(
            getEnv("pushover_api", "invalido").asString(),
            getEnv("pushover_user_id", "invalido").asString(),
            getEnv("pushover_run", "false").asBoolean());

}

@Bean
public RabbitMQSettings getRabbitMQSettings() {
    return new RabbitMQSettings(
            new RabbitConnectionInfo(
                    getEnv("rabbitmq_host", "127.0.0.1").asString(),
                    getEnv("rabbitmq_port", "5672").asInt(),
                    getEnv("rabbitmq_virtual_host", "/").asString(),
                    getEnv("rabbitmq_username", "guest").asString(),
                    getEnv("rabbitmq_password", "guest").asString()),
            new RabbitConnectionInfo(
                    getEnv("rabbitmq_fullcam_host", "127.0.0.1").asString(),
                    getEnv("rabbitmq_fullcam_port", "5672").asInt(),
                    getEnv("rabbitmq_fullcam_virtual_host", "/").asString(),
                    getEnv("rabbitmq_fullcam_username", "guest").asString(),
                    getEnv("rabbitmq_fullcam_password", "guest").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_consumer_fullarm_queue", "fcomQueConsumerFullCam").asString(),
                    getEnv("rabbitmq_consumer_fullarm_exc", "fcomExcConsumer").asString(),
                    getEnv("rabbitmq_consumer_fullarm_rk", "fcomRKConsumerFullCam").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_consumer_fullcam_queue", "foo").asString(),
                    getEnv("rabbitmq_consumer_fullcam_exc", "foo").asString(),
                    getEnv("rabbitmq_consumer_fullcam_rk", "foo").asString()),
            new RabbitQueueInfo(
                    getEnv("rabbitmq_publish_fullcam_queue", "fullcamRequest").asString(),
                    getEnv("rabbitmq_publish_fullcam_exc", "fullcamRequestExc").asString(),
                    getEnv("rabbitmq_consumer_fullarm_rk", "fullcamRequestRK").asString()));
}

@Bean
public RedisSettings getRedisSettings() {
    return new RedisSettings(
            getEnv("redis_host", "localhost").asString(),
            getEnv("redis_port", "6379").asInt(),
            getEnv("redis_password", "123456").asString());
}

@Bean
public Connection getConnection() {
    try {
        return RabbitConnectionFactory.create(getRabbitMQSettings().getConnectionInfo());
    } catch (IOException | TimeoutException e) {
        throw new ShutdownException(e);
    }
}

@Bean
public Logging getLogging() {
    return new DefaultLogger();
}

MessageConsumerServiceImpl类

@Component
public class MessageConsumerServiceImpl implements MessageConsumerService {

private final Connection rabbitMQConnection;
private final MessageConsumerFactory consumerFactory;
private final RabbitMQSettings mqSettings;
private final ShutdownService shutdownService;
private final Logging logger;

@Inject
public MessageConsumerServiceImpl(Connection rabbitMQConnection,
                                  MessageConsumerFactory consumerFactory,
                                  RabbitMQSettings mqSettings,
                                  ShutdownService shutdownService,
                                  Logging logger) {
    this.rabbitMQConnection = rabbitMQConnection;
    this.consumerFactory = consumerFactory;
    this.mqSettings = mqSettings;
    this.shutdownService = shutdownService;
    this.logger = logger;
}

@Override
public void startListening() {
    try {
        RabbitQueueInfo commandQueInfo = mqSettings.getRabbitMQFullArmConsumerQueue();
        final String queue = commandQueInfo.getQueue();

        Channel channel = rabbitMQConnection.createChannel();
        channel.queueDeclare(queue, true, false, false, null);
        MessageConsumer commandConsumer = consumerFactory.create(channel);

        logger.info("[MESSAGE-CONSUMER] - Consumindo da fila: {}", queue);
        channel.basicConsume(queue, commandConsumer);

    } catch (IOException e) {
        logger.error("[MESSAGE-CONSUMER] - ShutdownException", e);
        shutdownService.shutdown(e);
    }
}

集成测试类

public class MessageConsumerServiceImplIntegrationTest {

private static final Integer RABBITMQ_PORT = 5672;
private static final String RABBITMQ_EXC = "fcomExcConsumer";
private static final String RABBITMQ_QUEUE = "fcomQueFullcamIntegration";
private static final String RABBITMQ_RK = "fcomRKConsumerFullCam";
private static final String REDIS_PASSWORD = "123456";
private static final int REDIS_PORT = 6379;

public static RabbitMQContainer rabbitMqContainer;
public static GenericContainer redisContainer;

static {
    redisContainer = new GenericContainer<>("redis:5.0.3-alpine")
            .withExposedPorts(REDIS_PORT)
            .withCommand("redis-server --requirepass " + REDIS_PASSWORD)
            .waitingFor(Wait.forListeningPort());
    redisContainer.start();
}

static {
    rabbitMqContainer = new RabbitMQContainer()
            .withExposedPorts(RABBITMQ_PORT)
            .withExposedPorts(15672)
            .withUser("guest", "guest")
            .withVhost("/")
            .waitingFor(Wait.forListeningPort());
    rabbitMqContainer.start();
}

@Rule
public final EnvironmentVariables environmentVariables = new EnvironmentVariables()
        .set("rabbitmq_host", rabbitMqContainer.getContainerIpAddress())
        .set("rabbitmq_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
        .set("rabbitmq_virtual_host", "/")
        .set("rabbitmq_username", "guest")
        .set("rabbitmq_password", "guest")

        .set("rabbitmq_fullcam_host", rabbitMqContainer.getContainerIpAddress())
        .set("rabbitmq_fullcam_port", String.valueOf(rabbitMqContainer.getMappedPort(RABBITMQ_PORT)))
        .set("rabbitmq_fullcam_virtual_host", "/")
        .set("rabbitmq_fullcam_username", "guest")
        .set("rabbitmq_fullcam_password", "guest")

        .set("rabbitmq_publish_fullcam_queue", "Fullarm.Request")
        .set("rabbitmq_publish_fullcam_exc", "fcomExcFullcam")
        .set("rabbitmq_publish_fullcam_rk", "fcomRKFullcamRequest")

        .set("rabbitmq_consumer_fullarm_queue", RABBITMQ_QUEUE)
        .set("rabbitmq_consumer_fullarm_exc", RABBITMQ_EXC)
        .set("rabbitmq_consumer_fullarm_rk", RABBITMQ_RK)

        .set("rabbitmq_consumer_fullcam_queue", "Fullarm.Reponse")
        .set("rabbitmq_consumer_fullcam_exc", "fcomExcFullarm")
        .set("rabbitmq_consumer_fullcam_rk", "fcomRKFullarmFullcamIntegration")

        .set("f_infra_rabbit_host", "abobora")
        .set("f_infra_exchange", "abobora")
        .set("f_infra_queue", "abobora")
        .set("f_infra_process_id", "0")
        .set("f_infra_description", "abobora")
        .set("f_infra_text", "abobora")
        .set("f_infra_tag", "0")
        .set("f_infra_warning_time", "0")
        .set("f_infra_critical_time", "0")
        .set("f_infra_run", "false")
        .set("f_infra_ka_time", "1")

        .set("redis_host", redisContainer.getContainerIpAddress())
        .set("redis_port", String.valueOf(redisContainer.getMappedPort(REDIS_PORT)))
        .set("redis_password", REDIS_PASSWORD);

private MessageConsumerService instance;
private ApplicationContext context;

@Before
public void setUp() {
    context = new AnnotationConfigApplicationContext(SpringInfraConfig.class);
    instance = context.getBean(MessageConsumerService.class);
}

@Test
public void deveProcessarRequisicao() throws IOException, TimeoutException {
    String message = "{ \"tipoPacote\" : 3, \"descricao_painel\" : \"Casa Mauro Naves\", \"setor_disparado\" : \"Porta da Frente\", \"data_disparo\" : 1587151300000, \"cameras\" : [90851, 90853, 90854] }";

    ConnectionFactory factory = new ConnectionFactory();
    RabbitMQSettings settings = context.getBean(RabbitMQSettings.class);
    factory.setHost(settings.getConnectionInfo().getHost());
    factory.setPort(settings.getConnectionInfo().getPort());
    factory.setVirtualHost(settings.getConnectionInfo().getVirtualHost());
    factory.setAutomaticRecoveryEnabled(true);
    factory.setUsername(settings.getConnectionInfo().getUsername());
    factory.setPassword(settings.getConnectionInfo().getPassword());
    factory.setRequestedHeartbeat(50);
    Connection connection = factory.newConnection();

    RabbitQueueInfo commandQueInfo = settings.getRabbitMQFullArmConsumerQueue();
    Channel channel = connection.createChannel();
    channel.exchangeDeclare(commandQueInfo.getExchange(), "direct", true);
    channel.queueDeclare(commandQueInfo.getQueue(), true, false, false, null);
    channel.queueBind(commandQueInfo.getQueue(), commandQueInfo.getExchange(), commandQueInfo.getRoutingKey());
    channel.basicPublish(commandQueInfo.getExchange(), commandQueInfo.getRoutingKey(), MessageProperties.PERSISTENT_BASIC, message.getBytes());
    channel.close();
    connection.close();

    instance.startListening();

}

GRADLE依赖性

core-build.gradle

dependencies {

   compile group: 'javax.inject', name: 'javax.inject', version: '1'
   compile group: 'org.springframework', name: 'spring-context', version: '5.2.5.RELEASE'

   compile 'com.fasterxml.jackson.core:jackson-core:2.7.1'
   compile 'com.fasterxml.jackson.core:jackson-databind:2.7.1-1'

   compile group: 'br.com.fulltime.fullarm', name: 'cross-cutting-commons', version: '1.13.0'
   compile group: 'br.com.fulltime.fullarm', name: 'constants', version: '1.110.0'
}

infra-build.gradle

dependencies {

   testCompile group: 'junit', name: 'junit', version: '4.12'
   testCompile "org.testcontainers:testcontainers:1.14.1"
   testCompile "org.testcontainers:rabbitmq:1.14.1"
   testCompile group: 'com.github.stefanbirkner', name: 'system-rules', version: '1.19.0'


   compile project(':core')

   compile group: 'br.com.fulltime.fullarm', name: 'infrastructure-commons', version: '1.6.0'
   compile group: 'br.com.fulltime.fullarm', name: 'FInfraJavaLibrary', version: '2.3.0'
   compile group: 'br.com.fulltime.fullarm', name: 'pushover-lib', version: '1.0.0'

   compile group: 'redis.clients', name: 'jedis', version: '3.3.0'
}

测试输出

Testing started at 08:38 ...
Starting Gradle Daemon...
Gradle Daemon started in 815 ms
> Task :core:compileJava UP-TO-DATE
> Task :core:processResources NO-SOURCE
> Task :core:classes UP-TO-DATE
> Task :core:jar UP-TO-DATE
> Task :infra:compileJava UP-TO-DATE
> Task :infra:processResources NO-SOURCE
> Task :infra:classes UP-TO-DATE
> Task :infra:compileTestJava
> Task :infra:processTestResources NO-SOURCE
> Task :infra:testClasses
> Task :infra:test
WARNING: An illegal reflective access operation has occurred
WARNING: Illegal reflective access by org.junit.contrib.java.lang.system.EnvironmentVariables (file:/home/*omited*/.gradle/caches/modules-2/files-2.1/com.github.stefanbirkner/system-rules/1.19.0/d541c9a1cff0dda32e2436c74562e2e4aa6c88cd/system-rules-1.19.0.jar) to field java.util.Collections$UnmodifiableMap.m
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
2020-05-14 08:38:35 INFO - [MESSAGE-CONSUMER] - Consumindo da fila: fcomQueFullcamIntegration
WARNING: Please consider reporting this to the maintainers of org.junit.contrib.java.lang.system.EnvironmentVariables
WARNING: Use --illegal-access=warn to enable warnings of further illegal reflective access     operations
WARNING: All illegal access operations will be denied in a future release
Deprecated Gradle features were used in this build, making it incompatible with Gradle 7.0.
Use '--warning-mode all' to show the individual deprecation warnings.
See https://docs.gradle.org/6.1/userguide/command_line_interface.html#sec:command_line_warnings
BUILD SUCCESSFUL in 22s
5 actionable tasks: 2 executed, 3 up-to-date
08:38:36: Task execution finished ':infra:test --tests     "br.com.fulltime.fullarm.fullcam.integration.infra.consumer.MessageConsumerServiceImplIntegrationTest.deveProcessarRequisicao"'.

我对这个问题没有更多的想法。任何帮助是欢迎的。

更新

我又写了我的测试,让它更简单。我写了一个有Spring-context和env的代码,另一个没有Spring-context和env的代码。两者都没有工作。

所以,为了测试porpuse,我编写了一个简单的 Thread#sleep() 你猜怎么着,两个测试都成功了

我认为原因是RabbitMQ DefaultConsumer实例化了一个新的线程用于消耗消息,从而释放了主测试线程,并且它被停止了。由于主线程被停止,所有其他线程也被停止。

所以我认为我们这里有一个同步测试的问题。

如果测试代码检查本应插入到exection中的数据库值,但在检查时间内尚未处理,则有可能出现测试失败的情况。

rabbitmq testcontainers spring-context
1个回答
0
投票

首先--你没有启用任何日志记录,所以很难说到底发生了什么。

你可以选择Spring Boot吗?它有内置的日志支持。或者你只是故意使用*context库?

© www.soinside.com 2019 - 2024. All rights reserved.