如何将传入的kafka消息轮询到Flux中

问题描述 投票:0回答:1

目前正在开发一个使用 Spring Boot 3.2.7 和 Web Spring Cloud 流绑定器的项目。我有一个基于提供程序实现的数据层,它将一些数据流式传输到我的服务中。大致如下:

// Loader definition used in various services
public interface ILoaderSPI {

    Flux<MyData> load(DataRequest request);
}

// Example usage Rest Controller

@Autowired
private ILoaderSPI someLoaderImpl;

@PostMapping(consumes = APPLICATION_JSON_VALUE, produces = APPLICATION_JSON_VALUE)
@ResponseStatus(OK)
public Mono<ComputeResult> compute(@RequestBody @Valid ComputeSpec spec) {
    return Mono.just(spec)
               .flatMap(this::someServiceCall)
                // used as middleware to load some data externally
               .flatMapMany(data -> someLoaderImpl.load(data))
               .collectList()
               .flatMap(this::compute);
}

目前我有一个实现,它基本上执行外部客户端请求。

public RestLoaderImpl implements ILoaderSPI {
    @Autowired private Webclient client;
    @Override
    public Flux<MyData> load(DataRequest request) {
        return client
                .post()
                .uri(...)
                .contentType(APPLICATION_JSON)
                .accept(APPLICATION_JSON)
                .bodyValue(request)
                .retrieve()           
                .bodyToFlux(MyData.class);
    }
}

现在我必须添加一个新的数据提供程序,该数据提供程序将使用 Kafka 加载(在 kafka 上发送数据请求/在 kafka 上接收所述数据并进行转换)

到目前为止,我已经使用了不同的 kafka 消费者,像这样使用流云绑定(但我只是根据传入消息触发一些事件处理)

public class SomeHandler implements Consumer<Flux<List<IncomingMessage>>> {

    @Override
    public void accept(Flux<List<IncomingMessage>> messages) {
        messages.flatMap(this::handleMessage)
            .subscribe();
    }

}

此订阅完成一次,消息不断流动。

消费者的模拟想法(函数应该自动订阅) - 将输入/输出声明为 Mono,希望每次消息到来时它都会重新订阅,因为我只对将单个事件流式传输到我的工作流程感兴趣

public class KafkaLoader implements Function<Mono<DataRequest>, Mono<List<MyData>> {

    @Override
    public Mono<List<MyData>> apply(Mono<DataRequest> request) {
        request.flatMapMany(this::convertData).collectList();
    }

}

public KafkaLoaderImpl implements ILoaderSPI {
    @Autowired private final StreamBridge streamBridge;

    @Override
    public Flux<MyData> load(DataRequest request) {
        Mono.just(request)
            // request through kafka
            .doOnNext(request -> streamBridge.send("request-topic", request))
            // only care to receive the "next" incoming message and connect it here
            .then(somehow stream the data from the kafka consumer whenever it receives it);

    }
}

有办法实现这个吗?

spring-webflux project-reactor spring-cloud-stream spring-cloud-stream-binder-kafka
1个回答
0
投票

我认为反应函数的正确签名是:

Function<Flux<DataRequest>, Flux<List<MyData>>

这样框架就会为该输入订阅一次

Flux

不确定这是否真的应该是

Flux<List>
,或者您可以将数据平面映射到单个
Flux<MyData>

最终

Cosnumer
可以像
Consumer<IncomingMessage>
一样简单。

我没有看到你的问题中与 Apache Kafka 相关的部分在哪里。

© www.soinside.com 2019 - 2024. All rights reserved.