我必须改变Spring Integration(4.3.12, Java DSL)中现有的流程。有一个现有的SOAP调用,之后我必须插入一个新的SOAP调用(已经完成了),如果现有的SOAP调用不成功,那么新的SOAP调用必须跳过(这是我的问题所在)。在下面的流程中 acmePreCompEnricher
是现有的呼叫和 ifMLCallRequiredEnricher
是新的一个。
return flow -> flow.channel(ORCH_REQUEST_INPUT)
.<HomeRequest, HomeModel>transform(requestToModelTransformer)
...
//
.enrich(this::acmePreCompRequestEnricher)
.enrich(this::acmePreCompEnricher)
.handle(this.acmePreCompResponseValidator())
//
.enrich(this::ifMLCallRequiredEnricher)
//
.enrich(this::acmeRequestEnricher)
.enrich(this::acmeEnricher)
...
所以在 acmePreCompEnricher
我设置了处理错误的错误通道。
ContentEnricher contentEnricher = enricherSpec
.requestPayload(Message::getPayload)
.requestChannel(acmePreCompEnrichmentInputChannel())
.replyChannel(acmePreCompEnrichmentOutputChannel())
.get();
contentEnricher.setErrorChannel(skipMLInputChannel());
@Bean(name = "skip.ml.input")
public MessageChannel skipMLInputChannel() {
return MessageChannels.direct().get();
}
如果发生SOAP故障,信息将被传送到下面的流程中。
@Bean
public IntegrationFlow processSkipML() {
return flow -> flow.channel("skip.ml.input")
.transform(ErrorMessage.class, (ErrorMessage m) -> {
Message originalMessage = ((MessageHandlingException)m.getPayload()).getFailedMessage();
return MessageBuilder.withPayload(originalMessage.getHeaders().get(HEADER_MODEL, HomeModel.class))
.copyHeaders(originalMessage.getHeaders())
.build();
})
.enrich(e -> e.propertyFunction("skipMLCall", m -> true))
.channel("enrich.ifMLCallNeeded.input");
}
后面的... ifMLCallRequiredEnricher
可以发现以下流程。
@Bean
public IntegrationFlow processIfMLCallRequiredFlow() {
return flow -> flow.channel("enrich.ifMLCallNeeded.input")
.route(ifMLCallRequired(), routeToMLGatewayOrBypassCall())
.channel("enrich.ifMLCallNeeded.output");
}
流程如下: ifMLCallRequired()
检查是否 skipMLCall
是false(在出错的情况下,它在错误通道后的流程中被设置为true),它将执行新的SOAP调用,否则它将跳过它。
当没有SOAP故障时,流程会正常进行。
然而,当SOAP故障被抛出时(即消息通过错误通道),我得到以下异常。
2020-05-22 10:10:48,023 ERROR com.acme.webservice.OrchestrationServiceEndpoint Thread=qtp14486859-13 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d Error
org.springframework.messaging.MessagingException: failure occurred in error-handling flow; nested exception is org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'enrich.ifMLCallNeeded.output'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=uk.co.acme.payload.request._2017._06.Message@4a5e6c, headers={replyChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@19d4520, errorChannel=org.springframework.messaging.core.GenericMessagingTemplate$TemporaryReplyChannel@19d4520, ws_soapAction=http://www.acme.co.uk/XRTEService/ProcessTran, id=902bd270-89d8-62e9-b00f-b69399241bd1, timestamp=1590138648017}], ...}]
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:489)
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceiveMessage(MessagingGatewaySupport.java:426)
at org.springframework.integration.transformer.ContentEnricher$Gateway.sendAndReceiveMessage(ContentEnricher.java:481)
at org.springframework.integration.transformer.ContentEnricher.handleRequestMessage(ContentEnricher.java:383)
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109)
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)
所以当没有SOAP故障的时候,一切都很正常,所以这个通道 enrich.ifMLCallNeeded.output
有订阅者,这是下一个充实者,请看下面的日志记录。
2020-05-24 20:37:58,819 INFO org.springframework.integration.channel.DirectChannel Thread=qtp14486859-10 MDC=16d7cc4c-c9da-449b-8bfa-504e6d81185d Channel 'enrich.ifMLCallNeeded.output' has 1 subscriber(s).
但是当SOAP故障出现时,该通道没有订阅者(我找不到任何日志记录)。我认为这是因为我试图用错误通道劫持流。但在这种情况下,我可以做什么呢?
我希望得到任何帮助,因为我现在被卡住了。非常感谢您的帮助
Regards, V.
下面是一个如何正确处理丰富子流上的错误的例子。
@SpringBootApplication
public class So61991580Application {
public static void main(String[] args) {
SpringApplication.run(So61991580Application.class, args);
}
private final AtomicBoolean which = new AtomicBoolean();
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(() -> new Foo(this.which.getAndSet(!which.get()) ? "foo" : "qux"),
e -> e.poller(Pollers.fixedDelay(5000)))
.enrich(spec -> spec.requestChannel("soap1.input")
.errorChannel("soap1Error.input"))
.route("payload.bar", r -> r
.channelMapping("good", "soap2.input")
.channelMapping("bad", "cleanUp.input"))
.get();
}
@Bean
public IntegrationFlow soap1() {
return f -> f
.handle(Foo.class, (payload, headers) -> {
if (payload.getFoo().equals("foo")) {
throw new RuntimeException("test enrich failure");
}
payload.setBar("good");
return payload;
});
}
@Bean
public IntegrationFlow soap2() {
return f -> f
.handle(Foo.class, (payload, headers) -> {
payload.setBaz("soap2");
return payload;
})
.channel("cleanUp.input");
}
@Bean
public IntegrationFlow soap1Error() {
return f -> f.<MessagingException, Foo>transform(ex -> {
Foo foo = (Foo) ex.getFailedMessage().getPayload();
foo.setBar("bad");
return foo;
});
}
@Bean
public IntegrationFlow cleanUp() {
return f -> f.log();
}
public static class Foo {
private final String foo;
private String bar;
private String baz;
public Foo(String foo) {
this.foo = foo;
}
public String getFoo() {
return this.foo;
}
public String getBar() {
return this.bar;
}
public void setBar(String bar) {
this.bar = bar;
}
public String getBaz() {
return this.baz;
}
public void setBaz(String baz) {
this.baz = baz;
}
@Override
public String toString() {
return "Foo [foo=" + this.foo + ", bar=" + this.bar + ", baz=" + this.baz + "]";
}
}
}
GenericMessage [payload=Foo [foo=foo, bar=bad, baz=null], headers={id=e5a943c7-dcf1-47f3-436e-5d0350a1c6f5, timestamp=1590511422083}]
GenericMessage [payload=Foo [foo=qux, bar=good, baz=soap2], headers={id=a99d7ddb-2f40-f0f7-08b6-6340563e011d, timestamp=1590511427086}]
2