轮询器将消息发送到 kafka 绑定

问题描述 投票:0回答:1

下面使用 Spring Cloud Stream Kafka 的代码工作正常,但一切都必须在这一个方法中。

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
    }
    @Bean
    Supplier<String> outbound() {
        return () -> {
            return LocalTime.now().toString();
        };
    }
}

如何编写 IntegrationFlow 来执行相同的操作并使用出站绑定器并允许我添加变压器等?下面的代码抛出错误:

MessageDispatchingException: Dispatcher has no subscribers

@SpringBootApplication
public class DemoApplication {

    public static void main(String[] args) {
        SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
    }
    @Bean
    IntegrationFlow myFlow() {
        return IntegrationFlow.fromSupplier(this::myPoller, p -> p.poller(Pollers.fixedDelay(5000)))
                .transform(m -> {
                    System.out.println("my transformer");
                    return m;
                })
                .channel("outbound")
                .get();
    }

    String myPoller() {
            return LocalTime.now() + " value";
    }
}
java spring-cloud-stream
1个回答
0
投票

为了构建使用 Spring Cloud Stream 将消息发送到 Kafka 主题的 IntegrationFlow,程序员需要正确配置流以连接输出通道并使用适当的绑定器。

我发现代码中的问题是通道(outbound)不会自动与Kafka绑定器绑定。因此,您应该使用Spring Cloud Stream提供的MessageChannel

下面给出的代码是更正后的代码:-


    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.cloud.stream.annotation.EnableBinding;
    import org.springframework.cloud.stream.messaging.Source;
    import org.springframework.context.annotation.Bean;
    import org.springframework.integration.dsl.IntegrationFlow;
    import org.springframework.integration.dsl.IntegrationFlows;
    import org.springframework.integration.dsl.Pollers;
    
    import java.time.LocalTime;
    
    @SpringBootApplication
    public class DemoApplication {
    
        public static void main(String[] args) {
            SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
        }
    
        @Bean
        public IntegrationFlow myFlow() {
            return IntegrationFlows.fromSupplier(this::myPoller,
                            e -> e.poller(Pollers.fixedDelay(5000)))
                    .transform(m -> {
                        System.out.println("my transformer: " + m);
                        return m;
                    })
                    .channel(Source.OUTPUT)
                    .get();
        }
    
        String myPoller() {
            return LocalTime.now() + " value";
        }
    }

我希望此代码已解决您的疑问。

© www.soinside.com 2019 - 2024. All rights reserved.