问题: Spring 应用程序堆内存不足,因为集成流对象正在填充空间而没有被垃圾收集。
我正在使用 Spring Integration 动态创建具有入站和出站适配器的 TCP 客户端。这些客户端连接到第三方应用程序,并且可能仅发送、仅接收或发送和接收,然后断开连接。应用程序运行一段时间后,我们开始看到堆内存不足错误。查看内存调试器,我可以看到调用 flowContext.remove(flowId) 后,与 IntegrationFlow 相关的对象没有被清理。
@Service
public class TcpConnectionManager {
@Autowired
IntegrationFlowContext context;
// IntegrationTcpConnection is a custom class for storing connection data
public IntegrationTcpConnection createConnection(String host, int port) {
String baseId = host + port + UUID.randomUUID();
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
MessageChannel send = new DirectChannel();
IntegrationFlow out = IntegrationFlow.from(send).handle(Tcp.outboundAdapter(cf)).get();
//I've also tried this as an alternative to the above ^
//IntegrationFlow out = f -> f.handle(Tcp.outboundAdapter(cf));
// Then when registering, return the messaging template to send data
context.registration(out).addBean(cf).id(baseId + ".out").register();
PollableChannel receive = new QueueChannel();
IntegrationFlow in = IntegrationFlow.from(Tcp.inboundAdapter(cf)).channel(receive).get();
context.registration(in).id(baseId + ".in").register();
// Store references to the input and output channels in this object to be used elsewhere
return new IntegrationTcpConnection(baseId, send, receive, cf);
}
// When business logic is done with the connection, it calls this method and the connection ends
public void removeFlow(IntegrationTcpConnection connection)
{
context.remove(connection.getBaseId() + ".in");
context.remove(connection.getBaseId() + ".out");
}
}
// Class for storing connection channels for sending and receiving
@AllArgsConstructor
public class IntegrationTcpConnection {
String baseId;
MessageChannel send;
PollableChannel receive;
TcpNetClientConnectionFactory cf;
public void send(String msg)
{
send.send(MessageBuilder.withPayload(msg).build());
}
public String receive(long timeout)
{
return (String) Objects.requireNonNull(receive.receive(timeout)).getPayload();
}
}
// A test endpoint I am hitting to test sending a message and then removing the flow
@RestController
public class TestController {
@Autowired
TcpConnectionManager manager;
@GetMapping("/test")
public String test()
{
for (int i = 0; i < 100; i++)
{
IntegrationTcpConnection con = manager.createConnection("localhost", 30000);
con.send("test message");
manager.removeFlow();
}
return "Success";
}
}
为了测试这一点,我有一个小应用程序,其中运行着一个集成 Tcp 服务器,用于记录收到的消息。客户端成功连接、发送消息、断开连接,基于for循环100次。但是,使用内存调试器,即使在调用removeFlow() 方法之后,我也可以看到 100 个与 IntegrationFlow 相关的对象(MessagingTemplate、EventDrivenConsumer、DirectChannel 等...)。
多次点击此端点将导致堆内存不足错误,因为 IntegrationFlow 未被删除。
Exception in thread "main" java.lang.OutOfMemoryError: Java heap space
我认为我注册流程的方式没有任何问题,因为我基于网上找到的示例,但我可能会弄错(Spring Integration Java DSL TCP 出站和入站连接处理程序)
编辑
我已经使用动态 TCP 示例(https://github.com/spring-projects/spring-integration-samples/tree/main/advanced/dynamic-tcp-client)进行了一些测试,并且能够重现内存不足错误。
我必须对示例进行一些修改,以便每次都会创建一个新流,并且流在注册时始终具有唯一的 ID。我仍然看到调试日志显示
TcpNetClientConnectionFactory
bean 已停止,但是在 IntelliJ 中查看堆内存时,我可以看到数百个 TcpNetClientConnectionFactories 仍在内存中。
以下是我对动态 TCP 示例所做的编辑。为了测试这一点,我使用了 for 循环来发送 100 条消息并删除流。
// Test method for removing a single flow manually
public void removeFlow() {
String key = subFlows.entrySet().iterator().next().getKey();
subFlows.remove(key);
this.flowContext.remove(key + ".flow");
}
@Override
protected synchronized Collection<MessageChannel> determineTargetChannels(Message<?> message) {
//MessageChannel channel = this.subFlows
// .get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));
//if (channel == null) {
// channel = createNewSubflow(message);
//}
MessageChannel channel = createNewSubflow(message);
return Collections.singletonList(channel);
}
private MessageChannel createNewSubflow(Message<?> message) {
String host = (String) message.getHeaders().get("host");
Integer port = (Integer) message.getHeaders().get("port");
Assert.state(host != null && port != null, "host and/or port header missing");
//String hostPort = host + port;
// Add a UUID to the hostPort so a new flow is always created
String hostPort = host + port + UUID.randomUUID();
TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
handler.setConnectionFactory(cf);
IntegrationFlow flow = f -> f.handle(handler);
IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
this.flowContext.registration(flow)
.addBean(cf)
.id(hostPort + ".flow")
.register();
MessageChannel inputChannel = flowRegistration.getInputChannel();
this.subFlows.put(hostPort, inputChannel);
return inputChannel;
}
@RestController
public class TestController {
@Autowired
TcpRouter router;
@GetMapping("/test")
public String test()
{
for (int i = 0; i < 100; i++)
{
toTCP.send("Test Message", "localhost", 40000);
router.removeFlow();
}
// Set a breakpoint here and view the heap memory contents to see IntegrationFlows
return "Success";
}
}
编辑2
我相信我已经确定问题的根源是在注册时在流 ID 中使用了 UUID。如果没有将 UUID 添加到流 ID,我的原始代码可以在不填充堆的情况下工作。
我假设 Spring 在后台重用 IntegrationFlow 组件,因为它们在仅使用主机 + 端口时具有相同的 ID。
我添加 UUID 的最初目的是确保即使对同一服务器的两个请求同时到达,也会创建新的客户端连接 IntegrationFlow,因为第三方软件需要每个请求响应一个全新的连接。
我想我明白那是哪里了。
AbstractBeanFactory.mergedBeanDefinitions
。当我们删除 bean 时,它看起来根本没有在运行时被清除。我稍后会研究一下。目前确实有一个解决方法是尽可能地重用 bean 名称。这样,它将在 mergedBeanDefinitions
缓存中更新,直到应用程序关闭。
另一个解决方法是依赖自动生成的 bean 名称,因为它默认存在:
if (flowId == null) {
flowId = generateBeanName(integrationFlow, null);
builder.id(flowId);
}