下面使用 Spring Cloud Stream Kafka 的代码工作正常,但一切都必须在这一个方法中。
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
}
@Bean
Supplier<String> outbound() {
return () -> {
return LocalTime.now().toString();
};
}
}
如何编写 IntegrationFlow 来执行相同的操作并使用出站绑定器并允许我添加变压器等?下面的代码抛出错误:
MessageDispatchingException: Dispatcher has no subscribers
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
}
@Bean
IntegrationFlow myFlow() {
return IntegrationFlow.fromSupplier(this::myPoller, p -> p.poller(Pollers.fixedDelay(5000)))
.transform(m -> {
System.out.println("my transformer");
return m;
})
.channel("outbound")
.get();
}
String myPoller() {
return LocalTime.now() + " value";
}
}
为了构建使用 Spring Cloud Stream 将消息发送到 Kafka 主题的 IntegrationFlow,程序员需要正确配置流以连接输出通道并使用适当的绑定器。
我发现代码中的问题是通道(outbound)不会自动与Kafka绑定器绑定。因此,您应该使用Spring Cloud Stream提供的MessageChannel。
下面给出的代码是更正后的代码:-
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.context.annotation.Bean;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.dsl.Pollers;
import java.time.LocalTime;
@SpringBootApplication
public class DemoApplication {
public static void main(String[] args) {
SpringApplication.run(DemoApplication.class, "--spring.cloud.stream.bindings.outbound-out-0.destination=out-topic");
}
@Bean
public IntegrationFlow myFlow() {
return IntegrationFlows.fromSupplier(this::myPoller,
e -> e.poller(Pollers.fixedDelay(5000)))
.transform(m -> {
System.out.println("my transformer: " + m);
return m;
})
.channel(Source.OUTPUT)
.get();
}
String myPoller() {
return LocalTime.now() + " value";
}
}
我希望此代码已解决您的疑问。