RxJava调度程序的用例

问题描述 投票:234回答:3

在RxJava中有5 different schedulers可供选择:

  1. immediate():创建并返回一个在当前线程上立即执行工作的Scheduler。
  2. trampoline():创建并返回一个调度程序,该调度程序对当前工作完成后要执行的当前线程进行排队。
  3. newThread():创建并返回一个Scheduler,为每个工作单元创建一个新的Thread。
  4. computation():创建并返回用于计算工作的Scheduler。这可以用于事件循环,处理回调和其他计算工作。不要在此调度程序上执行IO绑定的工作。请改用Schedulers.io()。
  5. io():创建并返回一个用于IO绑定工作的Scheduler。该实现由Executor线程池支持,该线程池将根据需要增长。这可用于异步执行阻塞IO。不要在此调度程序上执行计算工作。请改用Schedulers.computation()。

问题:

前3个调度程序非常自我解释;但是,我对计算和io有点困惑。

  1. 究竟什么是“IO限制工作”?它用于处理流(java.io)和文件(java.nio.files)?它用于数据库查询吗?它是用于下载文件还是访问REST API?
  2. calculate()与newThread()的不同之处是什么?是每次所有的calculate()调用都在单个(后台)线程而不是新的(后台)线程上吗?
  3. 为什么在进行IO工作时调用calculate()会很糟糕?
  4. 为什么在进行计算工作时调用io()会很糟糕?
java multithreading thread-safety rx-java rx-android
3个回答
317
投票

很棒的问题,我认为文档可以提供更多细节。

  1. io()由一个无限制的线程池支持,这是你用于非计算密集型任务的东西,这些东西不会对CPU造成太大负担。因此,与文件系统的交互,与不同主机上的数据库或服务的交互就是很好的例子。
  2. computation()由有界线程池支持,其大小等于可用处理器的数量。如果你试图在多个可用处理器之间并行安排CPU密集型工作(比如使用newThread()),那么当线程争夺处理器时,你就会面临线程创建开销和上下文切换开销,并且它可能会带来巨大的性能损失。
  3. 最好将computation()留给CPU密集型工作,否则你将无法获得良好的CPU利用率。
  4. 由于2中讨论的原因,将io()称为计算工作是不好的.io()是无界的,如果你在io()上并行安排了一千个计算任务,那么这千个任务中的每一个都将拥有自己的线程并竞争CPU引发的上下文切换成本。

2
投票

最重要的一点是,Schedulers.io和Schedulers.computation都支持无限制的线程池,而不是问题中提到的其他线程池。在使用newCachedThreadPool(无限制的自动回收线程池)创建Executor的情况下,Scheduler.from(Executor)仅共享此特性。

正如之前的回复和Web上的多篇文章所充分说明的那样,Schedulers.io和Schedulers.computation应该被仔细使用,因为它们针对其名称中的工作类型进行了优化。但是,就我而言,他们最重要的角色是为反应流提供真正的并发性。

与新手的看法相反,反应流本质上不是并发的,而是固有的异步和顺序。出于这个原因,只有在I / O操作阻塞时才使用Schedulers.io(例如:使用诸如Apache IOUtils FileUtils.readFileAsString(...)之类的阻塞命令)因此会冻结调用线程,直到操作为止。完成。

使用Java AsynchronousFileChannel(...)等异步方法不会在操作期间阻塞调用线程,因此使用单独的线程没有意义。事实上,Schedulers.io线程并不适合异步操作,因为它们不运行事件循环,并且回调永远不会被调用。

相同的逻辑适用于数据库访问或远程API调用。如果您可以使用异步或被动API来进行呼叫,请不要使用Schedulers.io。

回到并发。您可能无法访问异步或反应API以异步或并发地执行I / O操作,因此您唯一的选择是在单独的线程上调度多个调用。唉,Reactive流的顺序是顺序的,但好消息是flatMap()运算符可以在其核心引入并发性。

必须在流构造中构建并发,通常使用flatMap()运算符。这个功能强大的运算符可以配置为在内部为flatMap()嵌入的Function <T,R>提供多线程上下文。该上下文由多线程调度程序(如Scheduler.io或Scheduler.computation)提供。

有关RxJava2 SchedulersConcurrency的文章中的更多详细信息,您可以在其中找到代码示例和有关如何顺序和同时使用调度程序的详细说明。

希望这可以帮助,

Softjake


1
投票

This blog post provides an excellent answer

来自博文:

Schedulers.io()由无限制的线程池支持。它用于非CPU密集型I / O类型工作,包括与文件系统的交互,执行网络调用,数据库交互等。此线程池旨在用于异步执行阻塞IO。

Schedulers.computation()由有界线程池支持,其大小最多为可用处理器的数量。它用于计算或CPU密集型工作,例如调整图像大小,处理大型数据集等。注意:当您分配比可用内核更多的计算线程时,性能将因为上下文切换和线程创建开销而降低,因为线程争用处理器的时间。

Schedulers.newThread()为计划的每个工作单元创建一个新线程。这个调度程序很昂贵,因为每次都会生成新线程,并且不会重复使用。

Schedulers.from(Executor executor)创建并返回由指定执行程序支持的自定义调度程序。要限制线程池中同时线程的数量,请使用Scheduler.from(Executors.newFixedThreadPool(n))。这保证了如果在所有线程都被占用时调度任务,它将排队。池中的线程将一直存在,直到它被明确关闭。

主线程或AndroidSchedulers.mainThread()由RxAndroid扩展库提供给RxJava。主线程(也称为UI线程)是用户交互发生的地方。应该注意不要重载此线程以防止janky无响应的UI,或者更糟糕的是,应用程序无响应“(ANR)对话框。

Schedulers.single()是RxJava 2中的新增功能。此调度程序由单个线程支持,该线程按请求的顺序依次执行任务。

Schedulers.trampoline()由一个参与的工作线程以FIFO(先进先出)方式执行任务。它通常在实现递归时使用,以避免增加调用堆栈。

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.