我正在编写一个 Spring Boot 应用程序,它充当两个外部客户端(例如 serviceA 和 serviceB)之间的中间人。通信将通过套接字连接,即 tcp/ip 进行。 因此,serviceA 总是发起通信,我的应用程序接收消息并进行一些处理,记录到 db 等,然后将消息中继到 serviceB。 ServiceB 发送一个响应,我也在处理该响应,然后将响应发送回 serviceA。
当我远程登录“localhost
但是,目标不是使用 telnet,而是通过套接字连接从外部客户端(serviceA)接收消息。当客户端应用程序尝试连接到我的应用程序时,我的应用程序上根本没有任何活动。
那么为什么没有活动?
请注意,tcpInboundFlow 接收的消息是一个字符串。这是 TcpInboundGateway 使用序列化器/解串器的情况吗?
我正在利用 spring 集成 tcp。 通过使用 Java dsl 和基于文档的基于注释的配置的混合,我已经实现了一定程度的功能。
我使用 tcpInboundGateway 利用 serverConnectionFactory 来接收消息(字符串格式)
我使用路由器根据某些业务逻辑来确定通道
我使用变压器进行进一步处理
然后我利用 tcpOutboundGateway 将消息推送到外部客户端。
我使用 serviceActivator 来处理来自外部客户端的响应。
下面是一些使用的组件的代码片段。所有通道都是 directChannel 实现(未显示)
@Bean
public IntegrationFlow tcpInboundFlow() {
return IntegrationFlow.from(tcpInboundGateway())
.handle("messageProcessingService", "processMessage")
.route(mtiRouter())
.get();
}
@Bean
@Router(inputChannel = "handlerOutputChannel")
public AbstractMessageRouter mtiRouter() {
return new AbstractMessageRouter() {
@Override
protected Collection<MessageChannel> determineTargetChannels(Message<?> message) {
ISOMsg isoMsg = (ISOMsg) message.getPayload();
try {
String mti = isoMsg.getMTI();
log.info("reached the router");
return switch (mti) {
case "0800" -> Collections.singleton(networkManagementChannel());
case "0420" -> Collections.singleton(reversalChannel());
case "0200" -> Collections.singleton(financialRequestChannel());
default -> throw new TmsIsoException("Unsupported MTI: " + mti);
};
} catch (ISOException e) {
log.info("failed to reach the router");
throw new TmsIsoException("Failed to get mti from iso msg");
}
}
};
}
@Bean
public IntegrationFlow financialRequestFlow() {
return IntegrationFlow.from(financialRequestChannel())
.transform("financialRequestProcessingService", "processFinancialRequestMessage")
.transform("transformationService", "keyMessage")
.handle(tcpOutboundGateway())
.transform("responseService", "processResponseMessageForPurchase")
.get();
} @Bean
public TcpNetServerConnectionFactory serverConnectionFactory() {
TcpNetServerConnectionFactory factory = new TcpNetServerConnectionFactory(30002);
factory.setSerializer(new ByteArrayCrLfSerializer());
factory.setDeserializer(new ByteArrayCrLfSerializer());
// factory.setSoTimeout(60000);
return factory;
}
@Bean
public TcpNetClientConnectionFactory clientConnectionFactory() {
TcpNetClientConnectionFactory factory =
new TcpNetClientConnectionFactory("196.46.20.30", 5334);
factory.setSerializer(customIsoMessageSerializer);
factory.setDeserializer(customIsoMessageDeserializer);
return factory;
}
@Bean
public TcpInboundGateway tcpInboundGateway() {
TcpInboundGateway gateway = new TcpInboundGateway();
gateway.setConnectionFactory(serverConnectionFactory());
gateway.setRequestChannel(requestChannel());
gateway.setReplyChannel(replyChannel());
// gateway.setErrorChannel(errorChannel());
gateway.setReplyTimeout(60000);
gateway.setBeanName("tcpIn");
return gateway;
}
@Bean
public TcpOutboundGateway tcpOutboundGateway() {
TcpOutboundGateway gateway = new TcpOutboundGateway();
gateway.setConnectionFactory(clientConnectionFactory());
gateway.setRemoteTimeout(60000);
gateway.setReplyChannelName("replyChannel");
return gateway;
}
上述情况是由于使用了序列化器/反序列化器,只要在客户端建立连接即可。 有关所提供的序列化器的信息,请查看此处
就我而言,我选择使用 ByteStxEtxSerializer 和 ByteStxEtxDeserializer
因此,当客户端写入套接字时,它会传入一个字节,该字节必须按照我的 serverConnectionFactory 使用的序列化程序(上面命名)的要求进行格式化。
为了格式化字节,在客户端写了下面的内容
@组件 公共类 MessageFormatter {
private static final byte STX = 0x02; // Start of Text
private static final byte ETX = 0x03; // End of Text
private static final byte ESC = 0x1B; // Escape character
/**
* Converts a raw string into a byte array formatted with STX and ETX bytes,
* escaping any existing STX, ETX, and ESC bytes in the message.
*
* @param rawMessage the raw string message to be formatted
* @return the formatted byte array
*/
public static byte[] formatMessageWithStxEtx(String rawMessage) {
// Convert the raw string to a byte array
byte[] rawBytes = rawMessage.getBytes(StandardCharsets.UTF_8);
// Use a ByteArrayOutputStream to handle dynamic byte array creation
ByteArrayOutputStream byteStream = new ByteArrayOutputStream();
// Add the STX byte at the beginning
byteStream.write(STX);
// Escape any STX, ETX, and ESC bytes in the raw message
for (byte b : rawBytes) {
if (b == STX || b == ETX || b == ESC) {
byteStream.write(ESC); // Write the escape byte
}
byteStream.write(b); // Write the original byte
}
// Add the ETX byte at the end
byteStream.write(ETX);
return byteStream.toByteArray();
}
//下面这个方法在客户端是怎么写的
public String makeSocketCall(String ipAddress, String port, String
messageToBeSentViaTcp) {
Socket socket = null;
try {
socket = new Socket(ipAddress, Integer.parseInt(port));
//formatter converts the string into a byte array that has an Stx and Etx
//demarcator at the start and end of the array respectively.
byte[] byteRequest = MessageFormatter.formatMessageWithStxEtx(messageToBeSentViaTcp);
DataOutputStream dOut = new DataOutputStream(socket.getOutputStream());
DataInputStream dIn = new DataInputStream(socket.getInputStream());
dOut.write(byteRequest);
String response = dIn.readLine();
dOut.close();
dIn.close();
socket.close();
return response;
} catch (IOException e) {
throw new RuntimeException(e);
}
}
至于服务器,与之前的代码相比没有任何变化。 最重要的一点是确定串行器和通过 tcp 传输的消息的格式