在为依赖 Kafka 作为消息源的 Spring 集成应用程序编写集成测试时,我经常遇到在两个测试用例之间创建干净的测试状态的问题,因为 IntegrationFlow 配置为从我需要在测试中使用的主题进行消费。我使用 TestContainers 还是 EmbeddedKafka 都没关系,最佳实践都是为单个测试使用唯一的主题。
无论如何,我不知道如何在两个测试之间使用新主题重新初始化 IntegrationFlow。
想象一下
@Bean
public StandardIntegrationFlow startKafkaInbound() {
return IntegrationFlow.from(Kafka
.messageDrivenChannelAdapter(
kafkaConsumerFactory,
ListenerMode.record,
serviceProperties.getImportTopic().getName())
.messageConverter(messagingMessageConverter)
.errorChannel(Objects.requireNonNull(errorHandler.getInputChannel()))
.autoStartup(false)
)
.channel(Objects.requireNonNull(routeKafkaMessage().getInputChannel()))
.get();
}
现在编写 IntegrationTests 时,如何将此流程重新订阅到不同的主题?
致以诚挚的问候 丹尼尔
我们不知道你的
serviceProperties.getImportTopic()
是什么,但感觉这是你可以在测试之间以及流程停止/启动之后操纵主题名称的地方。
为了更好地了解问题到底是什么,我们可能需要看看您在测试中做了什么。