我有一个数据源服务,它需要一个观察者作为参数。
void subscribe(Consumer onEventConsumer);
我想使用flux作为RSocket的响应流,我该怎么做呢? 在我看来,它应该是这样的
Flux<T> controllerMethod(RequestMessage mgs) {
var flux = Flux.empty();
dataSource.subscribe(event -> flux.push(event));
return flux;
}
但是我很怀疑这是不是一个合适的解决方案,我是新手,不知道这里应该用什么方法?
我有一个类似的情况,我需要消耗和管道上的消息流。所以这里是它的简化版本,代码中的注释。
public class Main {
// create queue for storing the messages
private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
private static final Consumer<String> consumer = s -> {
// block another spawned thread if no more space is present
try { queue.put(s); } catch (InterruptedException e) {}
};
public static void main(String[] args) throws Exception {
// size shouldn't be more than our queue size
IntStream.range(0, 50).forEach(Main::consume);
fluxGenerator().subscribe(); // subscribe to flux
// write to the queue after subscribing
// here the size can be anything as long as subscriber can handle it
IntStream.range(50, 100000).forEach(Main::consume);
}
static Flux<String> fluxGenerator() {
return Flux.<String>generate(sink -> {
// block another spawned thread if no more elements are present
try {sink.next(queue.take()); } catch (InterruptedException e) {}
})
// we need to subscribe on another thread
.subscribeOn(Schedulers.newSingle("async"))
.log();
}
static void consume(String str) {
consumer.accept(str); // consume the messages
}
static void consume(Number i) {
consume("" + i);
}
}
这是一个典型的Flux.create用例,你在create lambda里面注册一个obsereer,它将把接收到的数据传递给提供的FluxSink。