终止后重新订阅 Sink

问题描述 投票:0回答:1

我有一个水槽,在更远的地方有一个订户。在压力测试期间,我们意识到队列已满,接收器发出了

FAIL_OVERFLOW
信号。继续测试后,所有
emitNext
调用开始抛出信号
FAIL_TERMINATED

我的第一个想法是放一个

retry()
onErrorContinue()
或类似的东西,但问题不是订阅被取消,而是接收器 KO。 有没有办法在发出之前了解接收器的状态和/或重新建立/重新订阅已终止接收器的优雅方式?

这是我的代码示例

class class EventEmitter {

   public void emitNext(event){
       log(event);
       deniedTimeoutEvents.emitNext(event, emitNextFailureHandler);
   }

   public Flux<Event> getEvents() {
      return unhandledEvents.asFlux();
   }
 
   private final Sinks.Many<PriceEvent> unhandledEvents = Sinks.many().multicast().onBackpressureBuffer();
}

public class otherClass {

   public void function(){
      eventEmitter.getEvents()
            .filter(this::shouldKeepGoing)
            .doOnNext(repository::remove)
            .subscribe();
  }
}

使用反应堆3.4

spring-webflux project-reactor
1个回答
0
投票

EventEmitter
的实施是否在您的控制之下?或者它只是外部服务的模拟实现?如果它们在你的控制之下,我认为你可以让
emitNextFailureHandler
在遇到
true
FAIL_OVERFLOW
时返回
FAIL_TERMINATED
,以便
EventEmitter
可以重试发射。


另外,我想从另一个方面来讨论这个问题。

我认为

repository::remove
是一种同步方法,它会阻塞您的线程,因此这就是队列承受如此巨大压力的原因。这可能不是反应式编程的最佳实践。更好的方法可能是让
repository.remove()
返回
Mono
Flux
,然后使用
.flatMap(repository::remove)
而不是
.doOnNext()

© www.soinside.com 2019 - 2024. All rights reserved.