如何从其他方法动态添加元素到反应堆热通量?

问题描述 投票:0回答:1

我有一个数据源服务,它需要一个观察者作为参数。

void subscribe(Consumer onEventConsumer);

我想使用flux作为RSocket的响应流,我该怎么做呢? 在我看来,它应该是这样的

Flux<T> controllerMethod(RequestMessage mgs) {
   var flux = Flux.empty();
   dataSource.subscribe(event -> flux.push(event));
   return flux;
}

但是我很怀疑这是不是一个合适的解决方案,我是新手,不知道这里应该用什么方法?

java reactive-programming project-reactor flux rsocket
1个回答
0
投票

我有一个类似的情况,我需要消耗和管道上的消息流。所以这里是它的简化版本,代码中的注释。

public class Main {
    // create queue for storing the messages
    private static BlockingQueue<String> queue = new ArrayBlockingQueue<>(50);
    private static final Consumer<String> consumer = s -> {
        // block another spawned thread if no more space is present
        try { queue.put(s); } catch (InterruptedException e) {}
    };

    public static void main(String[] args) throws Exception {
        // size shouldn't be more than our queue size
        IntStream.range(0, 50).forEach(Main::consume);
        fluxGenerator().subscribe(); // subscribe to flux
        // write to the queue after subscribing
        // here the size can be anything as long as subscriber can handle it
        IntStream.range(50, 100000).forEach(Main::consume);
    }

    static Flux<String> fluxGenerator() {
        return Flux.<String>generate(sink -> {
            // block another spawned thread if no more elements are present
            try {sink.next(queue.take()); } catch (InterruptedException e) {}
        })
                // we need to subscribe on another thread
                .subscribeOn(Schedulers.newSingle("async"))
                .log();
    }

    static void consume(String str) {
        consumer.accept(str); // consume the messages
    }

    static void consume(Number i) {
        consume("" + i);
    }

}

0
投票

这是一个典型的Flux.create用例,你在create lambda里面注册一个obsereer,它将把接收到的数据传递给提供的FluxSink。

© www.soinside.com 2019 - 2024. All rights reserved.