如何更改Reactor中的执行线程

问题描述 投票:0回答:2
@RequestMapping(value    = "/try",
      method   = RequestMethod.GET)
      @ResponseBody
public String demo(){
List<String>data=new ArrayList<>();
data.add("A1");
data.add("A2");
data.add("A3");
data.add("A4");

Flux.fromIterable(data).subscribe(s->printStatement(s));
return "done";
}

public  void printStatement(String s){
long i;
for(i=0;i<1000000000;i++)
{}
LOGGER.info(s+"------"+Thread.currentThread().getId());
}

在上面的例子中,我希望tread id会不同(跳跃异步执行)。从日志中我可以看到相同的tread正在执行整个过程

日志:

2018-05-02 03:24:42.387  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A1------26
2018-05-02 03:24:44.118  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A2------26
2018-05-02 03:24:44.418  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A3------26
2018-05-02 03:24:44.717  INFO 29144 --- [nio-8080-exec-1] c.n.p.s.p.reactorDemo       : A4------26

我如何确保它异步执行。

reactive-programming project-reactor
2个回答
1
投票

Reactor
的执行模型是大多数操作员不会为你改变线程(除非涉及到时间)。该库提供了两个允许切换到线程的运算符,
publishOn
(最常见)和
subscribeOn

例如,

Flux.fromIterable(data).publishOn(Schedulers.newSingle("example")).subscribe(...)
就是去这里的方法。

请注意,WebFlux 的模型是它在

Netty
线程(您看到的这些
nio
线程)中启动链的处理。因此,不要阻塞这些线程(这将完全阻止处理进一步传入的请求)是非常重要的。

Schedulers
为各种
Scheduler
风格提供工厂方法,这是一个 Reactor 抽象(或多或少位于
ExecutorService
之上)。


0
投票

请记住,Project Reactor 是

concurrency-agnostic
,也就是说,它不会强加给您任何线程模型。此外,事件的处理发生在执行订阅的线程上。您使用
WebFlux
,因此订阅发生在
nio
池中的线程之一上。在基于 Unix 的系统中,您会看到
epoll
线程。尽管如此,只涉及一个线程,因为您不使用任何
Schedulers
,也不使用
flatMap
,根据定义,它是异步的。

非常好的阅读:https://projectreactor.io/docs/core/release/reference/#schedulers

© www.soinside.com 2019 - 2024. All rights reserved.