当它们失败超过阈值时停止所有异步任务吗?

问题描述 投票:3回答:1

我正在使用Monix Task进行异步控制。

场景

  1. 任务并行执行
  2. 如果失败发生超过X次
  3. 停止所有尚未完成的任务(尽快完成)

我的解决方案

我提出了在1.结果和2.错误计数器之间竞争的想法,并取消失败者。如果错误计数器先达到阈值,则通过Task.race,然后将通过Task.race取消任务。

实验

Ammonite REPL

{
  import $ivy.`io.monix::monix:3.1.0`
  import monix.eval.Task
  import monix.execution.atomic.Atomic
  import scala.concurrent.duration._
  import monix.execution.Scheduler
  //import monix.execution.Scheduler.Implicits.global
  implicit val s = Scheduler.fixedPool("race", 2) // pool size

  val taskSize = 100
  val errCounter = Atomic(0)
  val threshold = 3

  val tasks = (1 to taskSize).map(_ => Task.sleep(100.millis).map(_ => errCounter.increment()))
  val guard = Task(f"stop because too many error: ${errCounter.get()}")
    .restartUntil(_ => errCounter.get() >= threshold)

  val race = Task
    .race(guard, Task.gather(tasks))
    .runToFuture
    .onComplete { case x => println(x); println(f"completed task: ${errCounter.get()}") }
}

issue

结果取决于线程池的大小!?

对于池大小1结果几乎总是任务成功,即永无止境。

Success(Right(.........))
completed task: 100 // all task success !

对于池大小2成功与失败之间的关系非常不确定,取消操作也不准确。例如:

Success(Left(stop because too many error: 1))
completed task: 98

取消操作最迟已完成98个任务。错误计数小到阈值。

默认的全局调度程序具有相同的结果行为。

对于游泳池大小200]它更具确定性,并且停止更早,因此在完成较少任务的意义上更加准确。

Success(Left(stop because too many error: 2))
completed task: 8

池大小越大越好。


如果我将Task.gather更改为Task.sequence执行,所有问题都消失了!


这种依赖池大小的原因是什么?一旦出现太多错误,如何改进它或有更好的替代方法来停止任务?

我正在使用Monix Task进行异步控制。如果X次以上发生故障,将并行执行场景任务,请停止所有尚未处于完整状态的任务(尽快)。解决方案I ...

multithreading scala concurrency scala-cats monix
1个回答
1
投票

您所看到的可能是monix调度程序的效果以及它如何实现公平性。这是一个相当复杂的主题,但是文档和scaladocs非常出色(请参阅:https://monix.io/docs/3x/execution/scheduler.html#execution-model

© www.soinside.com 2019 - 2024. All rights reserved.