我想编写一个单元测试,以确保某些重试机制到位,因此当
Publisher
第一次订阅时发出错误时,被测试的代码可以处理 a 。
假设我有一个工厂风格的方法,它创建包裹在
MyObject
中的 Mono
实例:
public Mono<Void> createMyObject() {
return myObjectFactory.createMyObject()
.doOnNext(myObject -> myObject.performActionOnObject())
.doOnError(throwable -> LOG.error("Error while creating MyObject!", throwable))
.retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(10))
.doBeforeRetry(retrySignal -> LOGGER.error("Retrying to create new MyObject due to {}, attempt nr. {}...", retrySignal.failure().getMessage(),
retrySignal.totalRetriesInARow() + 1)))
.then();
}
createMyObject()
可能会返回 Mono.error()
,因此 retryWhen
用于确保处理此类错误并在几秒钟内重试创建。
我试图使用
reactor.test.publisher.TestPublisher
和 StepVerifier
来测试此行为,这将模拟一个场景,其中第一个订阅将返回错误,但第二个订阅将返回结果。
例如:
@Test
void testRetries() {
TestPublisher<MyObject> myObjectTestPublisher = TestPublisher.create();
Mockito.when(myObjectFactory.createMyObject()).thenReturn(myObjectTestPublisher.mono());
MyObject myObjectMock = Mockito.mock(MyObject.class);
StepVerifier.withVirtualTime(() -> createMyObject())
.expectSubscription()
.then(() -> myObjectTestPublisher.assertWasSubscribed())
.then(() -> myObjectTestPublisher.error(new RuntimeException("Dummy error")))
.then(() -> myObjectTestPublisher.emit(myObjectMock))
.verifyTimeout(Duration.ofMinutes(10));
Mockito.verify(myObjectMock, Mockito.times(1)).performActionOnObject();
}
然而,每当我运行此测试时,代码都会陷入无限次重试,其中
TestPublisher
始终返回带有 Mono.error()
的 RuntimeException("Dummy error")
。
因此,当重试机制起作用时,无论执行多少次重试,
TestPublisher
都不会发出myObjectMock
。
我还尝试重写单元测试以使用
Sinks.one()
而不是 TestPublisher
,但我最终得到了相同的行为。
如何告诉
TestPublisher
在第一次订阅时发出 Mono.error()
,并在第二次订阅(重试)时发出模拟对象?
啊,所以我找到了窍门。
问题是,TestPublisher 在发出错误(终端信号)时将删除其所有订阅者。
因此,您必须创建一个具有
CLEANUP_ON_TERMINATE
违规的 TestPublisher:
TestPublisher.createColdNonCompliant(false, TestPublisher.Violation.CLEANUP_ON_TERMINATE);
此外,如果您使用
StepVerifier
,则需要将发布者设置为cold,否则重试机制触发的订阅可能会错过值的发射。