我用 Citrus Framework 编写了一个 Cucumber 测试用例,我想在其中测试一个在内部向 Kafka 发布消息的 HTTP 调用。我编写了一个测试用例来使用“Given、When、Then”语法来验证此行为。测试用例按预期调用 HTTP 端点,并在第一次尝试时验证 Kafka 消息的接收,但当我多次执行相同的测试用例时,它总是失败并且仅在第一次尝试时通过。这就是我的测试用例的编写方式:
@When("^I make a http call$")
public void when_scenario() {
runner.when(http()
.client("myHTTPClient")
.send()
.post("/api/endpoint")
.message()
.contentType(MediaType.APPLICATION_XML_VALUE)
.header("X-B3-TraceId", "randomstring")
.body("some XML payload"));
}
@Then("^a kafka message should be sent$")
public void then_scenario() {
runner.then(receive()
.endpoint("myKafkaEndpoint")
.message()
.timeout(5000)
.header("ce_source", "mysourcev1"));
}
这就是客户端端点的定义方式
@Bean
public HttpClient myHTTPClient() {
return CitrusEndpoints
.http()
.client()
.requestUrl("http://localhost:10000")
.build();
}
@Bean
public KafkaEndpoint myKafkaEndpoint() {
return CitrusEndpoints
.kafka()
.asynchronous()
.server("localhost:9092")
.topic("my-topic")
.offsetReset("earliest")
.build();
}
这是我收到的错误,即使我可以看到目标主题正在接收 kafka 消息:
org.citrusframework.exceptions.TestCaseFailedException: Action timeout after 5000 milliseconds. Failed to receive message on endpoint: 'my-topic'
我在这里缺少什么?
请考虑在 Http 客户端发送操作上使用 fork=true 选项。这是因为 Http 是同步协议,Citrus 将阻塞,直到收到响应消息。
分叉选项使 Citrus 立即继续其余测试,这将立即设置 Kafka 消费者。根据 Kafka 消费者和连接设置,当 Kafka 消费者开始订阅并且 Kafka 消息已发送时,可能会出现竞争条件。
此外,当多次运行相同的测试时,请确保在同一套件中运行的所有测试中重复使用相同的 KafkaEndpoint 实例。当端点多次重新初始化时,根据消费者组设置和其他消费者选项,它可能无法接收消息。
将共享 Kafka 端点的消费者组设置为固定值对于您的用例来说也是一件好事。