我有以下代码(显然是作为示例):
class RandomClass(
val producer: Producer<String, String>
) {
fun randomFunction(): Boolean {
// Using .get() because I want to make sure it got sent before continuing
producer.send( (...) ).get()
return true
}
}
我通过创建一个 MockProducer 并将其而不是真正的 Producer 注入到类中来构建测试。然后我执行我的函数并验证结果。
"Some test of random function" {
val mockProducer = MockProducer(
false,
StringSerializer(),
StringSerializer()
)
// Build class with mockproducers
val randomClass = RandomClass(
producer = mockProducer
)
// Execute function
randomClass.randomFunction()
// Verify executions
mockProducer.history().size shouldBe 1
}
上面的代码现在将挂起,因为函数调用
.get()
正在阻塞线程。这是可以预料的,因为我们还没有告诉 MockProducer 如何处理 .send()
函数调用。
根据我所了解到的情况,似乎我必须决定在调用.send()
之后如何处理函数调用。对我来说,这在测试范围中完全没有意义,因为我不知道何时实际调用发送。根据我的测试,我仅控制何时执行对
randomClass.randomFunction()
的调用,之后程序将在可用时执行,并且由于该函数正在阻塞,意味着测试将不会继续,直到 .send()
完成。 因此,由于
.get()
,我无法告诉 MockProducer 在发生后如何处理 .send()
调用。
我引用了 Baeldung 的指南:https://www.baeldung.com/kafka-mock Producer(...) 其次,我们将调用mockProducer.errorNext(e),以便MockProducer为最后一次send()调用返回一个异常。:我引用了javadoc中的
MockProducer.completeNext()
成功完成最早未完成的通话。返回: true 如果有未完成的调用来完成
我想提前告诉MockProducer下一个调用或n个调用要做什么。这与使用 Mockito/Mockk 进行模拟的工作方式类似。下面的示例包含我想做的事情:
"Some test of random function" {
val mockProducer = MockProducer(
false,
StringSerializer(),
StringSerializer()
)
// Build class with mockproducers
val randomClass = RandomClass(
producer = mockProducer
)
// I wish to tell the MockProducer ahead of time that
// the next send should be completed successfully
mockProducer.completeNext() // Or .errorNext() if I wish to test an error
// Execute function
randomClass.randomFunction()
// Verify executions
mockProducer.history().size shouldBe 1
}
我真的没有办法提前告诉MockProducer如何完成对
.send()
的调用吗?
我今天遇到了类似的问题。我使用带有回调的发送而不是返回 future 的发送,并且想要提前定义回调结果(是否例外)。
为了实现你想要的,你可以扩展MockProducer并重写completeSend()以根据你在调用send()之前定义的行为,并且如果你想立即检查Future,还可以重写send()来进行刷新以避免阻塞线程(尽管我会避免在每次发送消息后检查 Future 结果,或者如果您确实需要它,请在手动发送后调用lush(),然后才检查结果)。
实现示例:
public class CompletionMockProducer extends MockProducer<String, String> {
private final Queue<RuntimeException> completionResults = new LinkedList<>();
public CompletionMockProducer() {
super(false, new StringSerializer(), new StringSerializer());
}
public synchronized void addCompletionResults(RuntimeException ... exceptions) {
completionResults.addAll(Arrays.stream(exceptions).toList());
}
@Override
public synchronized void clear() {
super.clear();
completionResults.clear();
}
@Override
public synchronized Future<RecordMetadata> send(ProducerRecord<String, String> record) {
var result = super.send(record);
flush();
return result;
}
@Override
public synchronized boolean completeNext() {
Validate.isTrue(!completionResults.isEmpty(),
"Completion result not prepared using #addCompletionResults(...)");
return errorNext(completionResults.poll());
}
}