我有一个水槽,在更远的地方有一个订户。在压力测试期间,我们意识到队列已满,接收器发出了
FAIL_OVERFLOW
信号。继续测试后,所有 emitNext
调用开始抛出信号 FAIL_TERMINATED
。
我的第一个想法是放一个
retry()
或 onErrorContinue()
或类似的东西,但问题不是订阅被取消,而是接收器 KO。
有没有办法在发出之前了解接收器的状态和/或重新建立/重新订阅已终止接收器的优雅方式?
这是我的代码示例
class class EventEmitter {
public void emitNext(event){
log(event);
deniedTimeoutEvents.emitNext(event, emitNextFailureHandler);
}
public Flux<Event> getEvents() {
return unhandledEvents.asFlux();
}
private final Sinks.Many<PriceEvent> unhandledEvents = Sinks.many().multicast().onBackpressureBuffer();
}
public class otherClass {
public void function(){
eventEmitter.getEvents()
.filter(this::shouldKeepGoing)
.doOnNext(repository::remove)
.subscribe();
}
}
使用反应堆3.4
EventEmitter
的实施是否在您的控制之下?或者它只是外部服务的模拟实现?如果它们在你的控制之下,我认为你可以让 emitNextFailureHandler
在遇到 true
或 FAIL_OVERFLOW
时返回 FAIL_TERMINATED
,以便 EventEmitter
可以重试发射。
另外,我想从另一个方面来讨论这个问题。
我认为
repository::remove
是一种同步方法,它会阻塞您的线程,因此这就是队列承受如此巨大压力的原因。这可能不是反应式编程的最佳实践。更好的方法可能是让 repository.remove()
返回 Mono
或 Flux
,然后使用 .flatMap(repository::remove)
而不是 .doOnNext()