我想轮询API端点,直到达到某种条件。我希望它能在几秒到一分钟内达到这个状态。我有一个方法来调用返回Future
的端点。有没有什么方法可以将Future
s连接起来每隔n
毫秒轮询这个端点并在t
尝试后放弃?
假设我有一个带有以下签名的函数:
def isComplete(): Future[Boolean] = ???
在我看来,最简单的方法是阻止所有事情:
def untilComplete(): Unit = {
for { _ <- 0 to 10 } {
val status = Await.result(isComplete(), 1.seconds)
if (status) return Unit
Thread.sleep(100)
}
throw new Error("Max attempts")
}
但这可能会占用所有线程而且不是异步的。我还考虑过递归:
def untilComplete(
f: Future[Boolean] = Future.successful(false),
attempts: Int = 10
): Future[Unit] = f flatMap { status =>
if (status) Future.successful(Unit)
else if (attempts == 0) throw new Error("Max attempts")
else {
Thread.sleep(100)
untilComplete(isComplete(), attempts - 1)
}
}
但是,我担心最大化调用堆栈,因为这不是尾递归。
有没有更好的方法呢?
编辑:我正在使用akka
你可以使用Akka Streams。例如,要每隔500毫秒调用isComplete
,直到Future
的结果为真,最多五次:
import akka.actor.ActorSystem
import akka.stream.ActorMaterializer
import akka.stream.scaladsl.{ Sink, Source }
import scala.concurrent.Future
import scala.concurrent.duration._
def isComplete(): Future[Boolean] = ???
implicit val system = ActorSystem("MyExample")
implicit val materializer = ActorMaterializer()
implicit val ec = system.dispatcher
val stream: Future[Option[Boolean]] =
Source(1 to 5)
.throttle(1, 500 millis)
.mapAsync(parallelism = 1)(_ => isComplete())
.takeWhile(_ == false, true)
.runWith(Sink.lastOption)
stream onComplete { result =>
println(s"Stream completed with result: $result")
system.terminate()
}
它实际上根本不是递归的,所以堆栈会很好。
我能想到的一种改进方法是使用某种调度程序而不是Thread.sleep
,这样就不会占用线程。
这个例子使用标准java的TimerTask
,但如果你使用某种框架,比如akka,play或者其他什么,它可能有自己的调度程序,这将是一个更好的选择。
object Scheduler {
val timer = new Timer(true)
def after[T](d: Duration)(f :=> Future[T]): Future[T] = {
val promise = Promise[T]()
timer.schedule(TimerTask { def run() = promise.completeWith(f) }, d.toMillis)
promise.future
}
}
def untilComplete(attempts: Int = 10) = isComplete().flatMap {
case true => Future.successful(())
case false if attempts > 1 => Scheduler.after(100 millis)(untilComplete(attempts-1))
case _ => throw new Exception("Attempts exhausted.")
}
我给自己做了一个图书馆。我有
trait Poller extends AutoCloseable {
def addTask[T]( task : Poller.Task[T] ) : Future[T]
def close() : Unit
}
Poller.Task
的样子
class Task[T]( val label : String, val period : Duration, val pollFor : () => Option[T], val timeout : Duration = Duration.Inf )
Poller
民意调查每个period
直到pollFor
方法成功(产生Some[T]
)或超过timeout
。
为方便起见,当我开始轮询时,我将其包装成Poller.Task.withDeadline
:
final case class withDeadline[T] ( task : Task[T], deadline : Long ) {
def timedOut = deadline >= 0 && System.currentTimeMillis > deadline
}
它将任务的(不可变的,可重复使用的)timeout
持续时间转换为每次轮询尝试的最后期限。
为了有效地进行轮询,我使用Java的ScheduledExecutorService
:
def addTask[T]( task : Poller.Task[T] ) : Future[T] = {
val promise = Promise[T]()
scheduleTask( Poller.Task.withDeadline( task ), promise )
promise.future
}
private def scheduleTask[T]( twd : Poller.Task.withDeadline[T], promise : Promise[T] ) : Unit = {
if ( isClosed ) {
promise.failure( new Poller.ClosedException( this ) )
} else {
val task = twd.task
val deadline = twd.deadline
val runnable = new Runnable {
def run() : Unit = {
try {
if ( ! twd.timedOut ) {
task.pollFor() match {
case Some( value ) => promise.success( value )
case None => Abstract.this.scheduleTask( twd, promise )
}
} else {
promise.failure( new Poller.TimeoutException( task.label, deadline ) )
}
}
catch {
case NonFatal( unexpected ) => promise.failure( unexpected )
}
}
}
val millis = task.period.toMillis
ses.schedule( runnable, millis, TimeUnit.MILLISECONDS )
}
}
它似乎运作良好,不需要睡觉或阻止个别Threads
。
(看看图书馆,可以做很多事情来使它更清晰,更容易阅读,并且通过制作该类Poller.Task.withDeadline
的原始构造函数来阐明private
的作用。截止日期应始终从任务timeout
计算,不应该是任意的自由变量。)
此代码来自here (framework and trait)和here (implementation)。 (如果你想使用它,直接的maven坐标是here。)