我有一个用例,我需要将消息发送到具有 2 个 IP/端口的外部 TCP 服务器以进行循环分发(每对 IP/端口的每个连接,并且应该保持活动状态以进行消息交换)。
我正在使用 Spring Integration,并且没有有关 TCP Server 底层实现的信息,但消息结构由一个带有 4 个字节 ASCII 字符的标头组成,用于指定消息的长度(不包括标头),后跟消息本身。
例如,如果消息长度为 128 字节,则标头值“0128”将添加到消息的开头。因此,发送的数据的实际长度是132字节。
示例消息将如下所示:0022thisisanexamplemessage
我按照这个post设置了2个
TcpSendingMessageHandler
实例和2个FailoverClientConnectionFactory
实例,并使用相同的outboundChannel
作为输入通道来实现循环分配,如下所示(并成功将消息发送到TCP服务器) :
@Bean
public MessageChannel outboundChannel() {
return new DirectChannel();
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelOne() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(failoverClientConnectionFactoryOne());
handler.setClientMode(true);
return handler;
}
@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelTwo() {
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(failoverClientConnectionFactoryTwo());
handler.setClientMode(true);
return handler;
}
@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryOne() {
List<AbstractClientConnectionFactory> factories = new ArrayList<>();
factories.add(tcpNetClientConnectionFactoryOne());
factories.add(tcpNetClientConnectionFactoryTwo());
FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setCloseOnRefresh(true);
cf.setRefreshSharedInterval(10000L);
return cf;
}
@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryTwo() {
List<AbstractClientConnectionFactory> factories = new ArrayList<>();
factories.add(tcpNetClientConnectionFactoryTwo());
factories.add(tcpNetClientConnectionFactoryOne());
FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setCloseOnRefresh(true);
cf.setRefreshSharedInterval(10000L);
return cf;
}
@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryOne() {
TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort1);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(codec());
cf.setDeserializer(codec());
cf.setConnectionTimeout(connectionTimeout);
return cf;
}
@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryTwo() {
TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort2);
cf.setLeaveOpen(true);
cf.setSingleUse(false);
cf.setSoKeepAlive(true);
cf.setSerializer(codec());
cf.setDeserializer(codec());
cf.setConnectionTimeout(connectionTimeout);
return cf;
}
private ByteArrayLengthHeaderSerializer codec() {
ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer();
serializer.setMaxMessageSize(8 * 1024);
serializer.setInclusive(false);
return serializer;
}
对于来自 TCP 服务器的传入消息,我设置了 2 个
TcpReceivingChannelAdapter
实例和 2 个 FailoverClientConnectionFactory
实例,如上所述,并使用单个 inboundChannel
作为输出通道:
@Bean
public MessageChannel inboundChannel() {
return new DirectChannel();
}
@Bean
public MessageProducer tcpInboundChannelOne() {
TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryOne());
inboundChannelAdapter.setOutputChannel(inboundChannel());
return inboundChannelAdapter;
}
@Bean
public MessageProducer tcpInboundChannelTwo() {
TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryTwo());
inboundChannelAdapter.setOutputChannel(inboundChannel());
return inboundChannelAdapter;
}
@Bean
public IntegrationFlow tcpInboundFlow() {
return IntegrationFlow.from(inboundChannel())
.handle(message -> handleMessage(byte[] message.getPayload()))
.get();
}
private void handleMessage(byte[] payload) {
// Process incoming message here
log.info("Received: {}", new String(payload));
}
问题是我只能向 TCP Server 发送消息,但无法接收从 Server 发送的任何消息。我做错了什么?请帮忙。
感谢@ArtemBilan的建议,我创建了一个自定义的
Deserializer
,它扩展了AbstractByteArraySerializer
并添加到TcpNetCientConnectionFactory
,然后inboundChannel
能够接收传入消息。
public class CustomSerializer extends AbstractByteArraySerializer {
@Override
public byte[] deserialize(InputStream inputStream) throws IOException {
byte[] messageHeaderBytes = new byte[4];
int bytesRead = inputStream.read(messageHeaderBytes);
if (bytesRead != 4) {
log.error("Invalid message header length");
throw new IOException("Invalid message header length");
}
int messageLength = Integer.parseInt(new String( messageHeaderBytes, StandardCharsets.US_ASCII));
byte[] messageBytes = new byte[messageLength];
bytesRead = inputStream.read(messageBytes);
if (bytesRead != messageLength) {
log.error("Invalid message length");
throw new IOException("Invalid message length");
}
return messageBytes;
}
@Override
public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
outputStream.write(bytes);
}
}