我一直在努力为Spring-Integration轮询器流程实现重试模式(重试整个流程)。请在下面找到我的(错误的)源代码(无效)。
我在做什么错?(如果我在断点上放置一个断点并引发异常,则只会被击中一次)
非常感谢您的时间和专业知识。
最好的问候
nkjp
PS:也许尝试使用RetryTemplate扩展AbstractHandleMessageAdvice?
return IntegrationFLows.from(SOME_QUEUE_CHANNEL)
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(5000)
.advice(RetryInterceptorBuilder.stateless().maxAttempts(5).backOffOptions(1,2,10).build())))
.transform(p -> {
if (true) {
throw new RuntimeException("KABOOM");
}
return p;
})
.channel(new NullChannel())
.get();
如果添加poller.advice()
,则从Advice
方法开始,将poll()
应用于整个流程。由于您已经从该队列中轮询了一条消息,因此在下次尝试时没有要轮询的消息。对非事务性队列使用重试是一种反模式:您不回滚事务,因此您的数据也不会返回存储以供下一个poll()
使用。
目前尚无法从某个点重试整个子流,但是您绝对可以在特定的错误端点上使用RequestHandlerRetryAdvice
,就像您的transform()
和KABOOM
例外:
.transform(p -> { if (true) { throw new RuntimeException("KABOOM"); } return p; }, e -> e.advice(new RequestHandlerRetryAdvice()))
请参阅其
setRetryTemplate(RetryTemplate retryTemplate)
了解更多重试选项,而不是默认情况下只有3次尝试。
要生成子流,我们需要考虑实现HandleMessageAdvice
。像这样的东西:
.transform(p -> p, e -> e.poller(Pollers.fixedDelay(500000)) .advice(new HandleMessageAdvice() { RetryOperationsInterceptor delegate = RetryInterceptorBuilder.stateless() .maxAttempts(5) .backOffOptions(1, 2, 10) .build(); @Override public Object invoke(MethodInvocation invocation) throws Throwable { return delegate.invoke(invocation); } }))
但是再次:它不是
poller
建议。它是MessageHandler.handleMessage()
上的一个端点。