Spring 状态机有时会出现垃圾邮件重试耗尽错误

问题描述 投票:0回答:2

我已经设置了一个我认为相对简单的 Spring 状态机。有时(也许每次)某些事情会触发外部事件,我会收到大量此类错误。知道我做错了什么吗?

当发生这种情况时,我连续大约 10 次收到以下错误。

[parallel-2] ERROR reactor.core.publisher.Operators - Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
Caused by: reactor.core.Exceptions$RetryExhaustedException: Retries exhausted: 10/10
    at reactor.core.Exceptions.retryExhausted(Exceptions.java:290)
    at reactor.util.retry.RetryBackoffSpec.lambda$static$0(RetryBackoffSpec.java:67)
    at reactor.util.retry.RetryBackoffSpec.lambda$generateCompanion$4(RetryBackoffSpec.java:557)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.drain(FluxConcatMap.java:374)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerComplete(FluxConcatMap.java:295)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onComplete(FluxConcatMap.java:884)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1816)
    at reactor.core.publisher.MonoFlatMap$FlatMapInner.onNext(MonoFlatMap.java:249)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.subscribeNext(MonoIgnoreThen.java:232)
    at reactor.core.publisher.MonoIgnoreThen.subscribe(MonoIgnoreThen.java:51)
    at reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:284)
    at reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.propagateDelay(MonoDelay.java:270)
    at reactor.core.publisher.MonoDelay$MonoDelayRunnable.run(MonoDelay.java:285)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:68)
    at reactor.core.scheduler.SchedulerTask.call(SchedulerTask.java:28)
    at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source)
    at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source)
    at java.base/java.lang.Thread.run(Unknown Source)
Caused by: reactor.core.publisher.Sinks$EmissionException: Spec. Rule 1.3 - onSubscribe, onNext, onError and onComplete signaled to a Subscriber MUST be signaled serially.
    at reactor.core.publisher.InternalManySink.emitNext(InternalManySink.java:56)
    at org.springframework.statemachine.support.ReactiveStateMachineExecutor$1.lambda$null$0(ReactiveStateMachineExecutor.java:461)
    at reactor.core.publisher.MonoCallable.subscribe(MonoCallable.java:56)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.resubscribe(FluxRetryWhen.java:215)
    at reactor.core.publisher.FluxRetryWhen$RetryWhenOtherSubscriber.onNext(FluxRetryWhen.java:268)
    at reactor.core.publisher.FluxConcatMap$ConcatMapImmediate.innerNext(FluxConcatMap.java:281)
    at reactor.core.publisher.FluxConcatMap$ConcatMapInner.onNext(FluxConcatMap.java:860)
    at reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1815)

我的状态机看起来像这样:


public abstract class DBStateMachine extends StateMachineConfigurerAdapter<DBStates, DBStateChanges> {

    public abstract Guard<DBStates, DBStateChanges> isDown();

    public abstract Guard<DBStates, DBStateChanges> isReady();

    @Override
    public void configure(StateMachineStateConfigurer<DBStates, DBStateChanges> states)
            throws Exception {

        states
                .withStates()
                .initial(DBStates.INITIALIZING)
                .junction(DBStates.TESTING)
                // .end(DBStates.READY)
                .states(EnumSet.allOf(DBStates.class));

    }

    @Override
    public void configure(
            StateMachineTransitionConfigurer<DBStates, DBStateChanges> transitions)
            throws Exception {
        //@formatter:off
        transitions.withInternal()
                        .source(DBStates.INITIALIZING)
                        .action(timerAction())
                        .timer(100).and()
                    .withJunction()
                        .source(DBStates.TESTING)
                        .first(DBStates.DOWN, isDown())
                        .then(DBStates.READY, isReady())
                        .last(DBStates.BAD_PASSWORD).and()
                    .withExternal()
                        .source(DBStates.BAD_PASSWORD)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.PASSWORD_RECEIVED).and()
                    .withInternal()
                        .source(DBStates.DOWN)
                        .action(timerAction())
                        .timer(10000).and()
                    .withExternal()
                        .source(DBStates.DOWN)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.RETRY).and()
                        .withExternal()
                        .source(DBStates.INITIALIZING)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.RETRY).and()
                    .withExternal()
                        .source(DBStates.READY)
                        .target(DBStates.TESTING)
                        .event(DBStateChanges.ERROR);
            //@formatter:on
    }

    @Bean
    public TimerAction timerAction() {
        return new TimerAction();
    }

    public class TimerAction implements Action<DBStates, DBStateChanges> {

        @Override
        public void execute(StateContext<DBStates, DBStateChanges> context) {
            // do something in every 1 sec
            // context.getStateMachine().
            // TODO make this reactive?
            context.getStateMachine().sendEvent(DBStateChanges.RETRY);
        }
    }
    
    
}

我认为触发错误的代码是这样的:

@Autowired
    private StateMachine<DBStates, DBStateChanges> statemachine;
public void triggerStateChange() {
        statemachine.sendEvent(DBStateChanges.ERROR);
    }

编辑

我还将triggerStateChange方法更改为如下所示,但仍然收到错误。

public void triggerStateChange() {
    
        Message<DBStateChanges> m = new GenericMessage<>(DBStateChanges.ERROR); 
        statemachine.sendEvent(Mono.just(m)).subscribe();
//      statemachine.sendEvent(DBStateChanges.ERROR);
    }

我应该提到,我使用的是 Java 16.0.1、Spring Boot 2.5、Spring Framework 5.3.7 和 Spring State Machine 3.0.1。

spring project-reactor spring-statemachine
2个回答
1
投票

Sinks.many() 不是线程安全的,必须串行向订阅者发出信号。类似的错误发生如下如何从多个线程调用 Sinks.Many.tryEmitNext?


0
投票

我遇到了同样的问题,发现是

timer()
。如果该值太小(在您的情况下,只有 100 毫秒),则
ReactiveStateMachineExecutor.registerTriggerListener
会导致此错误。另请参阅此 github 票证

就我而言,可以将计时器的值提高到几秒,错误就消失了

© www.soinside.com 2019 - 2024. All rights reserved.