这是我的用例,用户使用websocket(带有订阅的GraphQl)订阅我的流,我需要通过用户ID返回org.reactivestreams.Publisher
的实例(应该是我的kafka主题订阅)过滤消息。
为了说明,像这样:
/ **
* I don´t know how to get a instance of Publisher<Balance>
* It should be a consumer from a kafka topic
*/
fun balance(myStream: Publisher<Balance>, userId: String): Publisher<Balance> {
return myStream.filter { it.userId == userId }
}
public Consumer<Flux<Balance>> myStream() {
//filter here and then publish to websocket.
}
Here是一个WebSocket接收器实现的示例,您可以将其用作指导,但这不是被动的。