我正在使用服务激活器的 Spring 集成链来处理来自队列的传入消息。保留传入消息的服务激活器之一
messagePersister
。如果此服务激活器失败,则会出现重试建议,再尝试该操作 3 次。这部分工作正常,但如果所有重试都失败,我们有一个恢复方法,可以以备用形式保留消息(还会触发一些通知等)。此恢复方法和原始持久化方法返回同一类的对象,然后需要由预处理器(链中的下一个服务激活器)处理。
然而,看起来,使用恢复选项会导致消息离开链,并且恢复服务激活器的返回对象不会沿着链走下去。同样,如果恢复方法抛出异常,它不会转到 redboxExceptionChannel
,这是正在侦听传入队列的适配器的异常。
<int-jms:message-driven-channel-adapter
id="inputChannelAdapter"
connection-factory="jmsConnectionFactory"
destination-name="REDBOX_IN"
channel="redboxIncomingChannel"
max-concurrent-consumers="1"
auto-startup="true"
acknowledge="transacted"
receive-timeout="20000"
error-channel="redboxExceptionChannel"/>
<int:chain id="redboxIncomingChannelProcessingChain"
input-channel="redboxIncomingChannel"
output-channel="redboxOutgoingMessageChannel">
<int:service-activator ref="messagePersister"
method="persistAndAddClientMessageIdToHeader">
<int:request-handler-advice-chain>
<int:retry-advice max-attempts="4" recovery-channel="persistenceRetriesExhaustedChannel" >
<int:exponential-back-off initial="800"
multiplier="3"
maximum="25000"/>
</int:retry-advice>
</int:request-handler-advice-chain>
</int:service-activator>
<int:service-activator ref="redboxPreProcessor" method="validate"/>
<int:service-activator ref="redboxProcessor" method="evaluateRules"/>
</int:chain>
<int:service-activator ref="messagePersister"
method="retriesExhausted" input-channel="persistenceRetriesExhaustedChannel" />
我期望恢复方法成为触发重试的链的一部分。
行为是正确的。
ErrorMessageSendingRecoverer
的逻辑是这样的:
@Override
public Object recover(RetryContext context) {
publish(context.getLastThrowable(), context);
return null;
}
所以,它就不会返回。此时不知道您的服务激活器正在生成回复。
您可以这样解决问题:
在此时添加一个
<gateway>
,并通过重试将您的服务激活器提取到一个独立组件中,该组件具有输入通道作为上述 request-channel
中的 gateway
。
然后,您的
messagePersister.retriesExhausted
必须在从此方法返回之前查看 MessagingException.failedMessage
来复制其标头。这样,replyChannel
就会出现,端点就会知道将方法的结果发送到哪里。这个replyChannel
就是那个gateway
等待回复的地方。这样,您就得到了原始服务激活者的正常回复和persistenceRetriesExhaustedChannel
订阅者的补偿。
更新
关于恢复器子流程中的错误。 根据我的测试,它按预期工作:
@SpringBootApplication
public class So78089892Application {
public static void main(String[] args) {
SpringApplication.run(So78089892Application.class, args);
}
@Bean
ApplicationRunner sendToJms(JmsTemplate jmsTemplate) {
return args -> jmsTemplate.convertAndSend("REDBOX_IN", "test data");
}
@Bean
JmsMessageDrivenChannelAdapterSpec<?> inputChannelAdapter(ConnectionFactory jmsConnectionFactory) {
return Jms.messageDrivenChannelAdapter(jmsConnectionFactory)
.destination("REDBOX_IN")
.outputChannel("redboxIncomingChannel")
.errorChannel("redboxExceptionChannel");
}
@ServiceActivator(inputChannel = "redboxExceptionChannel")
void handleErrors(Exception exception) {
System.out.println("Error Received: \n" + exception);
}
@Bean
IntegrationFlow redboxIncomingChannelProcessingChain(RequestHandlerRetryAdvice retryAdvice) {
return IntegrationFlow
.from("redboxIncomingChannel")
.gateway((subFlow) -> subFlow
.handle((p, h) -> {
throw new RuntimeException("Persistence failed");
}), e -> e.advice(retryAdvice))
.get();
}
@Bean
RequestHandlerRetryAdvice retryAdvice(MessageChannel persistenceRetriesExhaustedChannel) {
RequestHandlerRetryAdvice requestHandlerRetryAdvice = new RequestHandlerRetryAdvice();
requestHandlerRetryAdvice.setRecoveryCallback(
new ErrorMessageSendingRecoverer(persistenceRetriesExhaustedChannel));
return requestHandlerRetryAdvice;
}
@Bean
DirectChannel persistenceRetriesExhaustedChannel() {
return new DirectChannel();
}
@ServiceActivator(inputChannel = "persistenceRetriesExhaustedChannel")
void retriesExhausted(Exception exception) {
throw new RuntimeException("Cannot recover", exception);
}
}
正如您在最后一个
retriesExhausted()
方法中看到的,我故意根据来自刚刚失败的处理程序的异常抛出一些异常,并提供重试建议。
最后我从该
handleErrors()
方法中得到了这样的日志:
Error Received:
org.springframework.messaging.MessageHandlingException: error occurred during processing message in 'MethodInvokingMessageProcessor' [org.springframework.integration.handler.MethodInvokingMessageProcessor@53b41cc8], failedMessage=ErrorMessage [payload=org.springframework.messaging.MessagingException: Failed to handle, failedMessage=GenericMessage [payload=test data, headers={jms_redelivered=false, JMSXDeliveryCount=1, jms_destination=ActiveMQQueue[REDBOX_IN], id=5ff5cbc6-f585-c113-a9c3-40a741d0cc7f, priority=4, jms_timestamp=1709326985025, jms_messageId=ID:18fa9c02-d80f-11ee-8409-00155d933a76, timestamp=1709326985042}], headers={id=9eb9a300-d058-da1c-8315-432c2ae0cb34, timestamp=1709326985055}]
(对 Java DSL 变体感到抱歉:我有一段时间没有使用 XML 配置了)。
我们的配置可能会有所不同。例如,您的
persistenceRetriesExhaustedChannel
不是 DirectChannel
...