@RequestMapping(value = "/try",
method = RequestMethod.GET)
@ResponseBody
public String demo(){
List<String>data=new ArrayList<>();
data.add("A1");
data.add("A2");
data.add("A3");
data.add("A4");
Flux.fromIterable(data).subscribe(s->printStatement(s));
return "done";
}
public void printStatement(String s){
long i;
for(i=0;i<1000000000;i++)
{}
LOGGER.info(s+"------"+Thread.currentThread().getId());
}
在上面的例子中,我希望tread id会不同(跳跃异步执行)。从日志中我可以看到相同的tread正在执行整个过程
日志:
2018-05-02 03:24:42.387 INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo : A1------26
2018-05-02 03:24:44.118 INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo : A2------26
2018-05-02 03:24:44.418 INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo : A3------26
2018-05-02 03:24:44.717 INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo : A4------26
我如何确保它异步执行。
Reactor
的执行模型是大多数操作员不会为你改变线程(除非涉及到时间)。该库提供了两个允许切换到线程的运算符,publishOn
(最常见)和subscribeOn
。
例如,
Flux.fromIterable(data).publishOn(Schedulers.newSingle("example")).subscribe(...)
就是去这里的方法。
请注意,WebFlux 的模型是它在
Netty
线程(您看到的这些 nio
线程)中启动链的处理。因此,不要阻塞这些线程(这将完全阻止处理进一步传入的请求)是非常重要的。
Schedulers
为各种 Scheduler
风格提供工厂方法,这是一个 Reactor 抽象(或多或少位于 ExecutorService
之上)。
请记住,Project Reactor 是
concurrency-agnostic
,也就是说,它不会强加给您任何线程模型。此外,事件的处理发生在执行订阅的线程上。您使用 WebFlux
,因此订阅发生在 nio
池中的线程之一上。在基于 Unix 的系统中,您会看到 epoll
线程。尽管如此,只涉及一个线程,因为您不使用任何 Schedulers
,也不使用 flatMap
,根据定义,它是异步的。
非常好的阅读:https://projectreactor.io/docs/core/release/reference/#schedulers