同一个容器多次调用ContainerTestUtils.waitForAssignment时如何使用?

问题描述 投票:0回答:1

我正在尝试编写 Kafka 监听器的可靠测试。值得一提的是,我使用外部 kafka 容器而不是

@EmbeddedKafka

我遇到的最大问题是如何确保在测试开始之前将 Kafka 侦听器分配给分区并准备好消费消息。我找到了

ContainerTestUtils.waitForAssignment
方法,但它没有像我预期的那样工作。

假设有 Kafka 监听器组件:

@Component
public class SampleKafkaConsumer {

    @KafkaListener(
        topics = "${kafka.listener.some-event.topic}",
        groupId = "${kafka.listener.some-event.group-id}",
    )
    void consume(final SomeEvent event) {
        // do sth
    }
}

还有一个用于集成测试的基类:

@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {

    @Autowired(required = true)
    private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry

    void setup() {
        kafkaListenerEndpointRegistry.getAllListenerContainers()
            .stream()
            .forEach { ContainerTestUtils.waitForAssignment(it, 1) }
    }
}

对于侦听器,分配了固定的组 ID:

kafka.listener.some-event.group-id=test.some-event

假设有一些集成测试类扩展

BaseKafkaIntegrationSpec
,第一个测试类可以工作,但第二次尝试执行
waitForAssignment()
会以错误结束:

java.lang.IllegalStateException: Expected 1 but got 0 partitions.

我试图用多种不同的方式来保护

waitForAssignment
电话,但没有运气。这应该是什么样子才能起作用?没有
@DirtiesContext
,所以 Spring 上下文被缓存,但我觉得运行新的测试类时容器发生了一些问题。

另一件事是,当我设置随机组ID名称时:

kafka.listener.some-event.group-id=test.some-event-${random.uuid}

出现另一个问题。尽管 Spring 缓存了它的上下文,但仍会为每个下一个测试类创建

SampleKafkaConsumer
的新实例(因为随机组 ID)。在查看日志时,我可以看到例如测试运行中的三个测试类,最后三个不同组 ID 的三个消费者正在运行并消费事件。

如果采用随机组 ID 的方法,是否可以以某种方式配置 Spring 来覆盖特定主题的现有 bean/容器,而不是创建新的?

spring-kafka spring-kafka-test
1个回答
0
投票

我认为

@DirtiesContext
是你的朋友。要点是,当 Spring 应用程序上下文被缓存时,它的所有 bean 都是活动的。因此,当您运行新测试时,您的分区很可能被其他缓存上下文中的容器窃取。只是因为它们都连接到同一个 Kafka 代理。

© www.soinside.com 2019 - 2024. All rights reserved.