我已经设置了一个我认为相对简单的 Spring 状态机。有时(也许每次)某些事情会触发外部事件,我会收到大量此类错误。知道我做错了什么吗?
当发生这种情况时,我连续大约 10 次收到以下错误。
[parallel-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
at java.base/java.lang.Thread.run(Unknown Source)
Caused by: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:56)
at org.springframework.statemachine.support.ReactiveStateMachineExecutor$1.lambda$null$0(ReactiveStateMachineExecutor.java:461)
at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:56)
at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:215)
at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:268)
at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)
我的状态机看起来像这样:
public abstract class DBStateMachine extends StateMachineConfigurerAdapter<DBStates, DBStateChanges> {
public abstract Guard<DBStates, DBStateChanges> isDown();
public abstract Guard<DBStates, DBStateChanges> isReady();
@Override
public void configure(StateMachineStateConfigurer<DBStates, DBStateChanges> states)
throws Exception {
states
.withStates()
.initial(DBStates.INITIALIZING)
.junction(DBStates.TESTING)
// .end(DBStates.READY)
.states(EnumSet.allOf(DBStates.class));
}
@Override
public void configure(
StateMachineTransitionConfigurer<DBStates, DBStateChanges> transitions)
throws Exception {
//@formatter:off
transitions.withInternal()
.source(DBStates.INITIALIZING)
.action(timerAction())
.timer(100).and()
.withJunction()
.source(DBStates.TESTING)
.first(DBStates.DOWN, isDown())
.then(DBStates.READY, isReady())
.last(DBStates.BAD_PASSWORD).and()
.withExternal()
.source(DBStates.BAD_PASSWORD)
.target(DBStates.TESTING)
.event(DBStateChanges.PASSWORD_RECEIVED).and()
.withInternal()
.source(DBStates.DOWN)
.action(timerAction())
.timer(10000).and()
.withExternal()
.source(DBStates.DOWN)
.target(DBStates.TESTING)
.event(DBStateChanges.RETRY).and()
.withExternal()
.source(DBStates.INITIALIZING)
.target(DBStates.TESTING)
.event(DBStateChanges.RETRY).and()
.withExternal()
.source(DBStates.READY)
.target(DBStates.TESTING)
.event(DBStateChanges.ERROR);
//@formatter:on
}
@Bean
public TimerAction timerAction() {
return new TimerAction();
}
public class TimerAction implements Action<DBStates, DBStateChanges> {
@Override
public void execute(StateContext<DBStates, DBStateChanges> context) {
// do something in every 1 sec
// context.getStateMachine().
// TODO make this reactive?
context.getStateMachine().sendEvent(DBStateChanges.RETRY);
}
}
}
我认为触发错误的代码是这样的:
@Autowired
private StateMachine<DBStates, DBStateChanges> statemachine;
public void triggerStateChange() {
statemachine.sendEvent(DBStateChanges.ERROR);
}
编辑
我还将triggerStateChange方法更改为如下所示,但仍然收到错误。
public void triggerStateChange() {
Message<DBStateChanges> m = new GenericMessage<>(DBStateChanges.ERROR);
statemachine.sendEvent(Mono.just(m)).subscribe();
// statemachine.sendEvent(DBStateChanges.ERROR);
}
我应该提到,我使用的是 Java 16.0.1、Spring Boot 2.5、Spring Framework 5.3.7 和 Spring State Machine 3.0.1。
Sinks.many() 不是线程安全的,必须串行向订阅者发出信号。类似的错误发生如下如何从多个线程调用 Sinks.Many
我遇到了同样的问题,发现是
timer()
。如果该值太小(在您的情况下,只有 100 毫秒),则 ReactiveStateMachineExecutor.registerTriggerListener
会导致此错误。另请参阅此 github 票证
就我而言,可以将计时器的值提高到几秒,错误就消失了