Spring Integration TCP 客户端无法从外部 TCP 服务器接收消息

问题描述 投票:0回答:1

我有一个用例,我需要将消息发送到具有 2 个 IP/端口的外部 TCP 服务器以进行循环分发(每对 IP/端口的每个连接,并且应该保持活动状态以进行消息交换)。

我正在使用 Spring Integration,并且没有有关 TCP Server 底层实现的信息,但消息结构由一个带有 4 个字节 ASCII 字符的标头组成,用于指定消息的长度(不包括标头),后跟消息本身。

例如,如果消息长度为 128 字节,则标头值“0128”将添加到消息的开头。因此,发送的数据的实际长度是132字节。

示例消息将如下所示:0022thisisanexamplemessage

我按照这个post设置了2个

TcpSendingMessageHandler
实例和2个
FailoverClientConnectionFactory
实例,并使用相同的
outboundChannel
作为输入通道来实现循环分配,如下所示(并成功将消息发送到TCP服务器) :

@Bean
public MessageChannel outboundChannel() {
    return new DirectChannel();
}

@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelOne() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(failoverClientConnectionFactoryOne());
    handler.setClientMode(true);
    return handler;
}

@Bean
@ServiceActivator(inputChannel = "outboundChannel")
public MessageHandler tcpOutboundChannelTwo() {
    TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
    handler.setConnectionFactory(failoverClientConnectionFactoryTwo());
    handler.setClientMode(true);
    return handler;
}

@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryOne() {
    List<AbstractClientConnectionFactory> factories = new ArrayList<>();
    factories.add(tcpNetClientConnectionFactoryOne());
    factories.add(tcpNetClientConnectionFactoryTwo());
    FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setCloseOnRefresh(true);
    cf.setRefreshSharedInterval(10000L);
    return cf;
}

@Bean
public FailoverClientConnectionFactory failoverClientConnectionFactoryTwo() {
    List<AbstractClientConnectionFactory> factories = new ArrayList<>();
    factories.add(tcpNetClientConnectionFactoryTwo());
    factories.add(tcpNetClientConnectionFactoryOne());
    FailoverClientConnectionFactory cf = new FailoverClientConnectionFactory(factories);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setCloseOnRefresh(true);
    cf.setRefreshSharedInterval(10000L);
    return cf;
}

@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryOne() {
    TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort1);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setSerializer(codec());
    cf.setDeserializer(codec());
    cf.setConnectionTimeout(connectionTimeout);
    return cf;
}


@Bean
public TcpNetClientConnectionFactoryOne tcpNetClientConnectionFactoryTwo() {
    TcpNetClientConnectionFactoryOne cf = new TcpNetClientConnectionFactoryOne(tcpServerHost, tcpServerPort2);
    cf.setLeaveOpen(true);
    cf.setSingleUse(false);
    cf.setSoKeepAlive(true);
    cf.setSerializer(codec());
    cf.setDeserializer(codec());
    cf.setConnectionTimeout(connectionTimeout);
    return cf;
}

private ByteArrayLengthHeaderSerializer codec() {
    ByteArrayLengthHeaderSerializer serializer = new ByteArrayLengthHeaderSerializer();
    serializer.setMaxMessageSize(8 * 1024);
    serializer.setInclusive(false);
    return serializer;
}

对于来自 TCP 服务器的传入消息,我设置了 2 个

TcpReceivingChannelAdapter
实例和 2 个
FailoverClientConnectionFactory
实例,如上所述,并使用单个
inboundChannel
作为输出通道:

@Bean
public MessageChannel inboundChannel() {
    return new DirectChannel();
}

@Bean
public MessageProducer tcpInboundChannelOne() {
    TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
    inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryOne());
    inboundChannelAdapter.setOutputChannel(inboundChannel());
    return inboundChannelAdapter;
}

@Bean
public MessageProducer tcpInboundChannelTwo() {
    TcpReceivingChannelAdapter inboundChannelAdapter = new TcpReceivingChannelAdapter();
    inboundChannelAdapter.setConnectionFactory(failoverClientConnectionFactoryTwo());
    inboundChannelAdapter.setOutputChannel(inboundChannel());
    return inboundChannelAdapter;
}

@Bean
public IntegrationFlow tcpInboundFlow() {
    return IntegrationFlow.from(inboundChannel())
            .handle(message -> handleMessage(byte[] message.getPayload()))
            .get();
}

private void handleMessage(byte[] payload) {
    // Process incoming message here
    log.info("Received: {}", new String(payload));
}

问题是我只能向 TCP Server 发送消息,但无法接收从 Server 发送的任何消息。我做错了什么?请帮忙。

java spring-boot spring-integration tcpclient spring-integration-dsl
1个回答
0
投票

感谢@ArtemBilan的建议,我创建了一个自定义的

Deserializer
,它扩展了
AbstractByteArraySerializer
并添加到
TcpNetCientConnectionFactory
,然后
inboundChannel
能够接收传入消息。

public class CustomSerializer extends AbstractByteArraySerializer {

    @Override
    public byte[] deserialize(InputStream inputStream) throws IOException {
        byte[] messageHeaderBytes = new byte[4];
        int bytesRead = inputStream.read(messageHeaderBytes);
        if (bytesRead != 4) {
            log.error("Invalid message header length");
            throw new IOException("Invalid message header length");
        }
        int messageLength = Integer.parseInt(new String( messageHeaderBytes, StandardCharsets.US_ASCII));
        byte[] messageBytes = new byte[messageLength];
        bytesRead = inputStream.read(messageBytes);
        if (bytesRead != messageLength) {
            log.error("Invalid message length");
            throw new IOException("Invalid message length");
        }
        return messageBytes;
    }

    @Override
    public void serialize(byte[] bytes, OutputStream outputStream) throws IOException {
        outputStream.write(bytes);
    }
}
© www.soinside.com 2019 - 2024. All rights reserved.