收听带有弹簧集成的TCP提要

问题描述 投票:0回答:1

我想连接到两个不同的TLS加密的TCP提要,并读取和发送json数据。当然,将以某种方式处理传入的消息。它并不一定需要在spring-integration中进行处理,而是可以放入队列中或其他任何方式。

我不想对主机和端口进行硬编码,因为我在运行时从要调用的REST API中获得了它们。

我发送的内容不一定会产生答复,我也不一定希望收到我发送的数据的答复。

我在通过弹簧集成实现这一点方面有些费劲。通过这样做,我设法获得了某种但不是很有效的东西:

    public static IntegrationFlow RegisterFeedFlow(final IntegrationFlowContext flowContext,
            final String id, final String host, final int port) {
        IntegrationFlow feedFlow = f -> f
                .handle(Tcp
                        .outboundGateway(Tcp.netClient(host, port).serializer(TcpCodecs.crlf())
                                .deserializer(TcpCodecs.lengthHeader1()))
                        .remoteTimeout(m -> 5000))
                .transform(Transformers.objectToString()).handle(System.out::println);


        flowContext.registration(feedFlow).id(id).register();

        return feedFlow;
    }

这里缺少的是:

  1. TLS。
  2. 因为我使用的是Tcp.outboundGateway(),所以它总是希望收到我发送的数据的答复。我不要这个。

我假设我可以通过在.serializer()和.deserializer()中放入内容来自动编码和解码json。这个假设正确吗?

我将如何正确实施?

spring-integration
1个回答
0
投票
@SpringBootApplication
public class Gitter66Application {

    public static void main(String[] args) {
        SpringApplication.run(Gitter66Application.class, args);
    }

    @Bean
    public TcpConnectionFactoryFactoryBean privateFeedClient() {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("client");
        fb.setHost("localhost");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.crlf());
        fb.setDeserializer(TcpCodecs.lengthHeader1());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public TcpConnectionFactoryFactoryBean privateFeedServer() {
        TcpConnectionFactoryFactoryBean fb = new TcpConnectionFactoryFactoryBean("server");
        fb.setPort(1234);
        fb.setSerializer(TcpCodecs.lengthHeader1());
        fb.setDeserializer(TcpCodecs.crlf());
        fb.setSslContextSupport(new DefaultTcpSSLContextSupport("keystore.ks",
                "trustStore.ks", "keyStorePassword", "trustStorePassword"));
        return fb;
    }

    @Bean
    public IntegrationFlow flowOut(AbstractClientConnectionFactory cf) {
        return f -> f.handle(Tcp.outboundAdapter(cf));
    }

    @Bean
    public IntegrationFlow flowIn(AbstractServerConnectionFactory cf) {
        return IntegrationFlows.from(Tcp.inboundAdapter(cf))
                .transform(Transformers.objectToString())
                .handle(System.out::println)
                .get();
    }

    @Bean
    public ApplicationRunner runner(@Qualifier("flowOut.input") MessageChannel channel) {
        return args -> {
            channel.send(new GenericMessage<>("foo"));
        };
    }

}

结果:

GenericMessage [payload=foo, headers={ip_tcp_remotePort=62471, ...
© www.soinside.com 2019 - 2024. All rights reserved.