我正在使用Monix Task
进行异步控制。
我提出了在1.结果和2.错误计数器之间竞争的想法,并取消失败者。如果错误计数器先达到阈值,则通过Task.race
,然后将通过Task.race
取消任务。
在Ammonite REPL
{
import $ivy.`io.monix::monix:3.1.0`
import monix.eval.Task
import monix.execution.atomic.Atomic
import scala.concurrent.duration._
import monix.execution.Scheduler
//import monix.execution.Scheduler.Implicits.global
implicit val s = Scheduler.fixedPool("race", 2) // pool size
val taskSize = 100
val errCounter = Atomic(0)
val threshold = 3
val tasks = (1 to taskSize).map(_ => Task.sleep(100.millis).map(_ => errCounter.increment()))
val guard = Task(f"stop because too many error: ${errCounter.get()}")
.restartUntil(_ => errCounter.get() >= threshold)
val race = Task
.race(guard, Task.gather(tasks))
.runToFuture
.onComplete { case x => println(x); println(f"completed task: ${errCounter.get()}") }
}
结果取决于线程池的大小!?
对于池大小1结果几乎总是任务成功,即永无止境。
Success(Right(.........))
completed task: 100 // all task success !
对于池大小2成功与失败之间的关系非常不确定,取消操作也不准确。例如:
Success(Left(stop because too many error: 1))
completed task: 98
取消操作最迟已完成98个任务。错误计数小到阈值。
默认的全局调度程序具有相同的结果行为。
对于游泳池大小200]它更具确定性,并且停止更早,因此在完成较少任务的意义上更加准确。
Success(Left(stop because too many error: 2)) completed task: 8
池大小越大越好。
如果我将Task.gather
更改为Task.sequence
执行,所有问题都消失了!
这种依赖池大小的原因是什么?一旦出现太多错误,如何改进它或有更好的替代方法来停止任务?
我正在使用Monix Task进行异步控制。如果X次以上发生故障,将并行执行场景任务,请停止所有尚未处于完整状态的任务(尽快)。解决方案I ...
您所看到的可能是monix调度程序的效果以及它如何实现公平性。这是一个相当复杂的主题,但是文档和scaladocs非常出色(请参阅:https://monix.io/docs/3x/execution/scheduler.html#execution-model)