如何使 Reactor TestPublisher 在第一个订阅时返回错误,但为下一个订阅发出值以测试重试?

问题描述 投票:0回答:1

我想编写一个单元测试,以确保某些重试机制到位,因此当

Publisher
第一次订阅时发出错误时,被测试的代码可以处理 a 。

假设我有一个工厂风格的方法,它创建包裹在

MyObject
中的
Mono
实例:

public Mono<Void> createMyObject() {
    return myObjectFactory.createMyObject()
            .doOnNext(myObject -> myObject.performActionOnObject())
            .doOnError(throwable -> LOG.error("Error while creating MyObject!", throwable))
            .retryWhen(Retry.fixedDelay(Long.MAX_VALUE, Duration.ofSeconds(10))
                    .doBeforeRetry(retrySignal -> LOGGER.error("Retrying to create new MyObject due to {}, attempt nr. {}...", retrySignal.failure().getMessage(),
                            retrySignal.totalRetriesInARow() + 1)))
            .then();
}

createMyObject()
可能会返回
Mono.error()
,因此
retryWhen
用于确保处理此类错误并在几秒钟内重试创建。

我试图使用

reactor.test.publisher.TestPublisher
StepVerifier
来测试此行为,这将模拟一个场景,其中第一个订阅将返回错误,但第二个订阅将返回结果。

例如:

@Test
void testRetries() {
    TestPublisher<MyObject> myObjectTestPublisher = TestPublisher.create();
    Mockito.when(myObjectFactory.createMyObject()).thenReturn(myObjectTestPublisher.mono());

    MyObject myObjectMock = Mockito.mock(MyObject.class);

    StepVerifier.withVirtualTime(() -> createMyObject())
            .expectSubscription()
            .then(() -> myObjectTestPublisher.assertWasSubscribed())
            .then(() -> myObjectTestPublisher.error(new RuntimeException("Dummy error")))
            .then(() -> myObjectTestPublisher.emit(myObjectMock))
            .verifyTimeout(Duration.ofMinutes(10));

    Mockito.verify(myObjectMock, Mockito.times(1)).performActionOnObject();
}

然而,每当我运行此测试时,代码都会陷入无限次重试,其中

TestPublisher
始终返回带有
Mono.error()
RuntimeException("Dummy error")

因此,当重试机制起作用时,无论执行多少次重试,

TestPublisher
都不会发出
myObjectMock

我还尝试重写单元测试以使用

Sinks.one()
而不是
TestPublisher
,但我最终得到了相同的行为。

TLDR

如何告诉

TestPublisher
在第一次订阅时发出
Mono.error()
,并在第二次订阅(重试)时发出模拟对象?

java project-reactor reactive-streams
1个回答
0
投票

啊,所以我找到了窍门。

问题是,TestPublisher 在发出错误(终端信号)时将删除其所有订阅者。

因此,您必须创建一个具有

CLEANUP_ON_TERMINATE
违规的 TestPublisher:

TestPublisher.createColdNonCompliant(false, TestPublisher.Violation.CLEANUP_ON_TERMINATE);

此外,如果您使用

StepVerifier
,则需要将发布者设置为cold,否则重试机制触发的订阅可能会错过值的发射。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.