我正在尝试编写 Kafka 监听器的可靠测试。值得一提的是,我使用外部 kafka 容器而不是
@EmbeddedKafka
。
我遇到的最大问题是如何确保在测试开始之前将 Kafka 侦听器分配给分区并准备好消费消息。我找到了
ContainerTestUtils.waitForAssignment
方法,但它没有像我预期的那样工作。
假设有 Kafka 监听器组件:
@Component
public class SampleKafkaConsumer {
@KafkaListener(
topics = "${kafka.listener.some-event.topic}",
groupId = "${kafka.listener.some-event.group-id}",
)
void consume(final SomeEvent event) {
// do sth
}
}
还有一个用于集成测试的基类:
@ActiveProfiles("test")
@SpringBootTest
abstract class BaseKafkaIntegrationSpec extends Specification {
@Autowired(required = true)
private KafkaListenerEndpointRegistry kafkaListenerEndpointRegistry
void setup() {
kafkaListenerEndpointRegistry.getAllListenerContainers()
.stream()
.forEach { ContainerTestUtils.waitForAssignment(it, 1) }
}
}
对于侦听器,分配了固定的组 ID:
kafka.listener.some-event.group-id=test.some-event
假设有一些集成测试类扩展
BaseKafkaIntegrationSpec
,第一个测试类可以工作,但第二次尝试执行waitForAssignment()
会以错误结束:
java.lang.IllegalStateException: Expected 1 but got 0 partitions.
我试图用多种不同的方式来保护
waitForAssignment
电话,但没有运气。这应该是什么样子才能起作用?没有 @DirtiesContext
,所以 Spring 上下文被缓存,但我觉得运行新的测试类时容器发生了一些问题。
另一件事是,当我设置随机组ID名称时:
kafka.listener.some-event.group-id=test.some-event-${random.uuid}
出现另一个问题。尽管 Spring 缓存了它的上下文,但仍会为每个下一个测试类创建
SampleKafkaConsumer
的新实例(因为随机组 ID)。在查看日志时,我可以看到例如测试运行中的三个测试类,最后三个不同组 ID 的三个消费者正在运行并消费事件。
如果采用随机组 ID 的方法,是否可以以某种方式配置 Spring 来覆盖特定主题的现有 bean/容器,而不是创建新的?
我认为
@DirtiesContext
是你的朋友。要点是,当 Spring 应用程序上下文被缓存时,它的所有 bean 都是活动的。因此,当您运行新测试时,您的分区很可能被其他缓存上下文中的容器窃取。只是因为它们都连接到同一个 Kafka 代理。