为什么 Spring 不清理动态 TCP 客户端的 IntegrationFlows

问题描述 投票:0回答:1

问题: Spring 应用程序堆内存不足,因为集成流对象正在填充空间而没有被垃圾收集。

我正在使用 Spring Integration 动态创建具有入站和出站适配器的 TCP 客户端。这些客户端连接到第三方应用程序,并且可能仅发送、仅接收或发送和接收,然后断开连接。应用程序运行一段时间后,我们开始看到堆内存不足错误。查看内存调试器,我可以看到调用 flowContext.remove(flowId) 后,与 IntegrationFlow 相关的对象没有被清理。

@Service
public class TcpConnectionManager {
    @Autowired
    IntegrationFlowContext context;

    // IntegrationTcpConnection is a custom class for storing connection data
    public IntegrationTcpConnection createConnection(String host, int port) {
        String baseId = host + port + UUID.randomUUID();
        TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
        
        MessageChannel send = new DirectChannel();
        IntegrationFlow out = IntegrationFlow.from(send).handle(Tcp.outboundAdapter(cf)).get();
        
        //I've also tried this as an alternative to the above ^
        //IntegrationFlow out = f -> f.handle(Tcp.outboundAdapter(cf));
        // Then when registering, return the messaging template to send data

        context.registration(out).addBean(cf).id(baseId + ".out").register();
        
        PollableChannel receive = new QueueChannel();
        IntegrationFlow in = IntegrationFlow.from(Tcp.inboundAdapter(cf)).channel(receive).get();
        context.registration(in).id(baseId + ".in").register();

        // Store references to the input and output channels in this object to be used elsewhere
        return new IntegrationTcpConnection(baseId, send, receive, cf);
    }

    // When business logic is done with the connection, it calls this method and the connection ends
    public void removeFlow(IntegrationTcpConnection connection)
    {
        context.remove(connection.getBaseId() + ".in");
        context.remove(connection.getBaseId() + ".out");
    }

}

// Class for storing connection channels for sending and receiving
@AllArgsConstructor
public class IntegrationTcpConnection {

    String baseId;
    MessageChannel send;
    PollableChannel receive;
    TcpNetClientConnectionFactory cf;

    public void send(String msg)
    {
        send.send(MessageBuilder.withPayload(msg).build());
    }

    public String receive(long timeout)
    {
        return (String) Objects.requireNonNull(receive.receive(timeout)).getPayload();
    }
}

// A test endpoint I am hitting to test sending a message and then removing the flow
@RestController
public class TestController {
    @Autowired
    TcpConnectionManager manager;

    @GetMapping("/test")
    public String test()
    {
        for (int i = 0; i < 100; i++)
        {
            IntegrationTcpConnection con = manager.createConnection("localhost", 30000);
            con.send("test message");
            manager.removeFlow();
        }
        
        return "Success";
    }

}

为了测试这一点,我有一个小应用程序,其中运行着一个集成 Tcp 服务器,用于记录收到的消息。客户端成功连接、发送消息、断开连接,基于for循环100次。但是,使用内存调试器,即使在调用removeFlow() 方法之后,我也可以看到 100 个与 IntegrationFlow 相关的对象(MessagingTemplate、EventDrivenConsumer、DirectChannel 等...)。

多次点击此端点将导致堆内存不足错误,因为 IntegrationFlow 未被删除。

Exception in thread "main" java.lang.OutOfMemoryError: Java heap space

我认为我注册流程的方式没有任何问题,因为我基于网上找到的示例,但我可能会弄错(Spring Integration Java DSL TCP 出站和入站连接处理程序

编辑

我已经使用动态 TCP 示例(https://github.com/spring-projects/spring-integration-samples/tree/main/advanced/dynamic-tcp-client)进行了一些测试,并且能够重现内存不足错误。

我必须对示例进行一些修改,以便每次都会创建一个新流,并且流在注册时始终具有唯一的 ID。我仍然看到调试日志显示

TcpNetClientConnectionFactory
bean 已停止,但是在 IntelliJ 中查看堆内存时,我可以看到数百个 TcpNetClientConnectionFactories 仍在内存中。

以下是我对动态 TCP 示例所做的编辑。为了测试这一点,我使用了 for 循环来发送 100 条消息并删除流。

        // Test method for removing a single flow manually
        public void removeFlow() {
            String key = subFlows.entrySet().iterator().next().getKey();
            subFlows.remove(key);
            this.flowContext.remove(key + ".flow");
        }


        @Override
        protected synchronized Collection<MessageChannel> determineTargetChannels(Message<?> message) {
            //MessageChannel channel = this.subFlows
            //      .get(message.getHeaders().get("host", String.class) + message.getHeaders().get("port"));
            //if (channel == null) {
            //  channel = createNewSubflow(message);
            //}
            MessageChannel channel = createNewSubflow(message);
            return Collections.singletonList(channel);
        }

        private MessageChannel createNewSubflow(Message<?> message) {
            String host = (String) message.getHeaders().get("host");
            Integer port = (Integer) message.getHeaders().get("port");
            Assert.state(host != null && port != null, "host and/or port header missing");

            //String hostPort = host + port;
            // Add a UUID to the hostPort so a new flow is always created
            String hostPort = host + port + UUID.randomUUID();

            TcpNetClientConnectionFactory cf = new TcpNetClientConnectionFactory(host, port);
            TcpSendingMessageHandler handler = new TcpSendingMessageHandler();
            handler.setConnectionFactory(cf);
            IntegrationFlow flow = f -> f.handle(handler);
            IntegrationFlowContext.IntegrationFlowRegistration flowRegistration =
                    this.flowContext.registration(flow)
                            .addBean(cf)
                            .id(hostPort + ".flow")
                            .register();
            MessageChannel inputChannel = flowRegistration.getInputChannel();
            this.subFlows.put(hostPort, inputChannel);
            return inputChannel;
        }

@RestController
public class TestController {
    @Autowired
    TcpRouter router;

    @GetMapping("/test")
    public String test()
    {
        for (int i = 0; i < 100; i++)
        {
            toTCP.send("Test Message", "localhost", 40000);
            router.removeFlow();
        }

        // Set a breakpoint here and view the heap memory contents to see IntegrationFlows   
        return "Success";
    }

}

编辑2

我相信我已经确定问题的根源是在注册时在流 ID 中使用了 UUID。如果没有将 UUID 添加到流 ID,我的原始代码可以在不填充堆的情况下工作。

我假设 Spring 在后台重用 IntegrationFlow 组件,因为它们在仅使用主机 + 端口时具有相同的 ID。

我添加 UUID 的最初目的是确保即使对同一服务器的两个请求同时到达,也会创建新的客户端连接 IntegrationFlow,因为第三方软件需要每个请求响应一个全新的连接。

spring-integration spring-integration-dsl
1个回答
0
投票

我想我明白那是哪里了。

AbstractBeanFactory.mergedBeanDefinitions
。当我们删除 bean 时,它看起来根本没有在运行时被清除。我稍后会研究一下。目前确实有一个解决方法是尽可能地重用 bean 名称。这样,它将在
mergedBeanDefinitions
缓存中更新,直到应用程序关闭。

另一个解决方法是依赖自动生成的 bean 名称,因为它默认存在:

        if (flowId == null) {
            flowId = generateBeanName(integrationFlow, null);
            builder.id(flowId);
        }
© www.soinside.com 2019 - 2024. All rights reserved.