Spring Cloud Stream--在运行时向多个动态目的地提供路由。

问题描述 投票:0回答:2

我有一个用例,在这个用例中,我需要向多个在运行时确定的Kafka topicdestinations生产。我试着将 具有多个输入和输出参数的函数 用回车 Flux<Message<T>> 从一个功能豆类型的 Function 与设置页眉 spring.cloud.stream.sendto.destination 对于每个 Message 如上所述 此处. 我想出了下面的实现。

@Bean
public Function<Person, Flux<Message<Person>>> route() {
    return person -> Flux.fromIterable(Stream.of(person.getEvents())
            .map(e -> MessageBuilder.withPayload(person)
                    .setHeader("spring.cloud.stream.sendto.destination", e).build())
            .collect(Collectors.toList()));
}

在我的配置中也有这个

spring.cloud.stream.dynamic-destinations=

这是我的 Person:

@AllArgsConstructor
@NoArgsConstructor
@Data
public class Person {
    private String[] events;
    private String name;
}

events 包含Kafka主题名称的列表。

然而,它没有工作。我漏掉了什么?

spring spring-boot spring-cloud-stream
2个回答
0
投票

一个工作的解决方案是使用 BinderAwareChannelResolver. 然而,如果提供了这样的功能,它就被废弃了。spring.cloud.stream.sendto.destination 属性在3.0中。

@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;

@Bean
public Consumer<Person> route() {
return person ->
        Stream.of(person.getEvents())
                .forEach(e -> binderAwareChannelResolver.resolveDestination(e)
                        .send(MessageBuilder.withPayload(person).build()));
}

我不喜欢这个解决方案,因为它将基于函数的编程模型和 "遗产式 "编程模型结合在一起。如果有人有更好的解决方案,请随时评论回答。


0
投票

spring.cloud.stream.sendto.destination 使用 BinderAwareChannelResolver 在内部,它已被弃用,改用 StreamBridge. 我想你可以把你的代码重写成下面的样子。我还没有测试过,但这是模板。

@Autowired StreamBridge streamBridge;

@Bean
public Consumer<Person> route() {
    return person -> streamBridge.send(person.getName(), person);
}

在幕后,Spring Cloud Stream将为Spring Cloud Stream创建一个绑定。Person 动态的。

如果你在部署时提前知道目的地,也可以通过配置来设置。例如 spring.cloud.stream.sourcefoo;bar..;.... 然后,该框架以 foo-out-0, bar-out-0 等。然后你需要设置目的地 - spring.cloud.stream.bindings.foo-out-0.destination=foo. 但由于你的用例是严格的动态目的地,你不能用这种方法,而是尝试使用我上面建议的方法。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.