目前正在开发一个使用 Spring Boot 3.2.7 和 Web Spring Cloud 流绑定器的项目。我有一个基于提供程序实现的数据层,它将一些数据流式传输到我的服务中。大致如下:
// Loader definition used in various services
public interface ILoaderSPI {
Flux<MyData> load(DataRequest request);
}
// Example usage Rest Controller
@Autowired
private ILoaderSPI someLoaderImpl;
@PostMapping(consumes = APPLICATION_JSON_VALUE, produces = APPLICATION_JSON_VALUE)
@ResponseStatus(OK)
public Mono<ComputeResult> compute(@RequestBody @Valid ComputeSpec spec) {
return Mono.just(spec)
.flatMap(this::someServiceCall)
// used as middleware to load some data externally
.flatMapMany(data -> someLoaderImpl.load(data))
.collectList()
.flatMap(this::compute);
}
目前我有一个实现,它基本上执行外部客户端请求。
public RestLoaderImpl implements ILoaderSPI {
@Autowired private Webclient client;
@Override
public Flux<MyData> load(DataRequest request) {
return client
.post()
.uri(...)
.contentType(APPLICATION_JSON)
.accept(APPLICATION_JSON)
.bodyValue(request)
.retrieve()
.bodyToFlux(MyData.class);
}
}
现在我必须添加一个新的数据提供程序,该数据提供程序将使用 Kafka 加载(在 kafka 上发送数据请求/在 kafka 上接收所述数据并进行转换)
到目前为止,我已经使用了不同的 kafka 消费者,像这样使用流云绑定(但我只是根据传入消息触发一些事件处理)
public class SomeHandler implements Consumer<Flux<List<IncomingMessage>>> {
@Override
public void accept(Flux<List<IncomingMessage>> messages) {
messages.flatMap(this::handleMessage)
.subscribe();
}
}
此订阅完成一次,消息不断流动。
消费者的模拟想法(函数应该自动订阅) - 将输入/输出声明为 Mono,希望每次消息到来时它都会重新订阅,因为我只对将单个事件流式传输到我的工作流程感兴趣
public class KafkaLoader implements Function<Mono<DataRequest>, Mono<List<MyData>> {
@Override
public Mono<List<MyData>> apply(Mono<DataRequest> request) {
request.flatMapMany(this::convertData).collectList();
}
}
public KafkaLoaderImpl implements ILoaderSPI {
@Autowired private final StreamBridge streamBridge;
@Override
public Flux<MyData> load(DataRequest request) {
Mono.just(request)
// request through kafka
.doOnNext(request -> streamBridge.send("request-topic", request))
// only care to receive the "next" incoming message and connect it here
.then(somehow stream the data from the kafka consumer whenever it receives it);
}
}
有办法实现这个吗?
我认为反应函数的正确签名是:
Function<Flux<DataRequest>, Flux<List<MyData>>
这样框架就会为该输入订阅一次
Flux
。
不确定这是否真的应该是
Flux<List>
,或者您可以将数据平面映射到单个 Flux<MyData>
。
最终
Cosnumer
可以像Consumer<IncomingMessage>
一样简单。
我没有看到你的问题中与 Apache Kafka 相关的部分在哪里。