我有一个用例,在这个用例中,我需要向多个在运行时确定的Kafka topicdestinations生产。我试着将 具有多个输入和输出参数的函数 用回车 Flux<Message<T>>
从一个功能豆类型的 Function
与设置页眉 spring.cloud.stream.sendto.destination
对于每个 Message
如上所述 此处. 我想出了下面的实现。
@Bean
public Function<Person, Flux<Message<Person>>> route() {
return person -> Flux.fromIterable(Stream.of(person.getEvents())
.map(e -> MessageBuilder.withPayload(person)
.setHeader("spring.cloud.stream.sendto.destination", e).build())
.collect(Collectors.toList()));
}
在我的配置中也有这个
spring.cloud.stream.dynamic-destinations=
这是我的 Person
:
@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
private String[] events;
private String name;
}
events
包含Kafka主题名称的列表。
然而,它没有工作。我漏掉了什么?
一个工作的解决方案是使用 BinderAwareChannelResolver
. 然而,如果提供了这样的功能,它就被废弃了。spring.cloud.stream.sendto.destination
属性在3.0中。
@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;
@Bean
public Consumer<Person> route() {
return person ->
Stream.of(person.getEvents())
.forEach(e -> binderAwareChannelResolver.resolveDestination(e)
.send(MessageBuilder.withPayload(person).build()));
}
我不喜欢这个解决方案,因为它将基于函数的编程模型和 "遗产式 "编程模型结合在一起。如果有人有更好的解决方案,请随时评论回答。
spring.cloud.stream.sendto.destination
使用 BinderAwareChannelResolver
在内部,它已被弃用,改用 StreamBridge
. 我想你可以把你的代码重写成下面的样子。我还没有测试过,但这是模板。
@Autowired StreamBridge streamBridge;
@Bean
public Consumer<Person> route() {
return person -> streamBridge.send(person.getName(), person);
}
在幕后,Spring Cloud Stream将为Spring Cloud Stream创建一个绑定。Person
动态的。
如果你在部署时提前知道目的地,也可以通过配置来设置。例如 spring.cloud.stream.source
如 foo;bar..;...
. 然后,该框架以 foo-out-0
, bar-out-0
等。然后你需要设置目的地 - spring.cloud.stream.bindings.foo-out-0.destination=foo
. 但由于你的用例是严格的动态目的地,你不能用这种方法,而是尝试使用我上面建议的方法。