我正在使用Spring集成为TCP遗留服务编写RabbitMQ接口。我能够连接到旧服务并发送XML字符串消息。
旧版服务器使用XML消息回复。收到消息后,自定义反序列化器会读取大部分消息,直到响应中的最后一行,然后失败并返回MessageTimeoutException。我设置了断点来验证这种情况正在发生。有什么明显的我做错了吗?
JUnit测试
@RunWith(SpringRunner.class)
@SpringBootTest
public class MaasIntegrationHelloWorld {
@Autowired
private TCPGateway gateway;
@Test
public void getAnalysisEngineInfo() throws JAXBException, ConnectException {
//Actual XML message creation and serialization removed for brevity
String message = "Hello World";
String result = gateway.send(message);
assertEquals(" ", result);
}
}
TCPCONFIG
@EnableIntegration
@IntegrationComponentScan
@ComponentScan
@Configuration
public class TCPConfig {
@Value("${legacy.hostname}")
private String hostname;
@Value("${legacy.port}")
private int port;
@Bean(name="replyChannel")
public MessageChannel replyChannel() {
return new DirectChannel();
}
@Bean(name="sendChannel")
public MessageChannel sendChannel() {
return new DirectChannel();
}
@Bean
public TcpNetClientConnectionFactory connectionFactory() {
TcpNetClientConnectionFactory connectionFactory = new TcpNetClientConnectionFactory(hostname, port);
connectionFactory.setDeserializer(new CommandResultDeserializer());
connectionFactory.setSingleUse(false);
return connectionFactory;
}
@Bean
@ServiceActivator(inputChannel = "sendChannel")
public TcpOutboundGateway tcpOutboundGateway() {
TcpOutboundGateway tcpOutboundGateway = new TcpOutboundGateway();
tcpOutboundGateway.setConnectionFactory(connectionFactory());
tcpOutboundGateway.setReplyChannel(this.replyChannel());
tcpOutboundGateway.setRequiresReply(true);
return tcpOutboundGateway;
}
}
网关接口
@MessagingGateway(defaultRequestChannel = "sendChannel")
public interface TCPGateway {
String send(String message) throws ConnectException;
}
网关监听器
@Configuration
@EnableIntegration
@IntegrationComponentScan
@MessageEndpoint
public class TCPGatewayListener {
@ServiceActivator(inputChannel = "replyChannel")
public String replyHandler(byte[] b) {
return new String(b);
}
}
解串器
public String deserialize(InputStream inputStream) throws IOException {
BufferedReader br = null;
StringBuilder sb = new StringBuilder();
String line;
br = new BufferedReader(new InputStreamReader(inputStream));
while ((line = br.readLine()) != null) {
sb.append(line);
}
return sb.toString();
}
堆栈跟踪
org.springframework.integration.MessageTimeoutException: Timed out waiting for response
at org.springframework.integration.ip.tcp.TcpOutboundGateway.handleRequestMessage(TcpOutboundGateway.java:146) ~[spring-integration-ip-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:127) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:148) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:121) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:150) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSendAndReceive(GenericMessagingTemplate.java:45) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.sendAndReceive(AbstractMessagingTemplate.java:42) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:97) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.core.MessagingTemplate.sendAndReceive(MessagingTemplate.java:38) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:79) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.messaging.core.AbstractMessagingTemplate.convertSendAndReceive(AbstractMessagingTemplate.java:70) [spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.doSendAndReceive(MessagingGatewaySupport.java:449) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.MessagingGatewaySupport.sendAndReceive(MessagingGatewaySupport.java:422) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invokeGatewayMethod(GatewayProxyFactoryBean.java:478) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.doInvoke(GatewayProxyFactoryBean.java:433) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayProxyFactoryBean.invoke(GatewayProxyFactoryBean.java:424) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.integration.gateway.GatewayCompletableFutureProxyFactoryBean.invoke(GatewayCompletableFutureProxyFactoryBean.java:65) [spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE]
at org.springframework.aop.framework.ReflectiveMethodInvocation.proceed(ReflectiveMethodInvocation.java:179) [spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.aop.framework.JdkDynamicAopProxy.invoke(JdkDynamicAopProxy.java:213) [spring-aop-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at com.sun.proxy.$Proxy107.send(Unknown Source) [na:na]
at orgl.shelly.maas.MaasIntegrationHelloWorld.getAnalysisEngineInfo(MaasIntegrationHelloWorld.java:49) [test-classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_151]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_151]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_151]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_151]
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) [junit-4.12.jar:4.12]
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) [junit-4.12.jar:4.12]
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) [junit-4.12.jar:4.12]
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.statements.RunBeforeTestMethodCallbacks.evaluate(RunBeforeTestMethodCallbacks.java:75) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestMethodCallbacks.evaluate(RunAfterTestMethodCallbacks.java:86) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.statements.SpringRepeat.evaluate(SpringRepeat.java:84) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:252) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.runChild(SpringJUnit4ClassRunner.java:94) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) [junit-4.12.jar:4.12]
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.statements.RunBeforeTestClassCallbacks.evaluate(RunBeforeTestClassCallbacks.java:61) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.springframework.test.context.junit4.statements.RunAfterTestClassCallbacks.evaluate(RunAfterTestClassCallbacks.java:70) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runners.ParentRunner.run(ParentRunner.java:363) [junit-4.12.jar:4.12]
at org.springframework.test.context.junit4.SpringJUnit4ClassRunner.run(SpringJUnit4ClassRunner.java:191) [spring-test-4.3.13.RELEASE.jar:4.3.13.RELEASE]
at org.junit.runner.JUnitCore.run(JUnitCore.java:137) [junit-4.12.jar:4.12]
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68) [junit-rt.jar:na]
at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47) [junit-rt.jar:na]
at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242) [junit-rt.jar:na]
at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70) [junit-rt.jar:na]
while ((line = br.readLine()) != null) {
sb.append(line);
}
这是一个奇怪的解串器 - 你怎么知道响应消息何时完成?
如果服务器回复多行响应,您需要知道响应何时以某种方式完成;也许最后一行是零长度,例如。
要么
if (line.equals("</endofdoc>") {
break;
}