目前的代码如下。
case object LatestMessageSignal
class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {
override def receive: Receive = {
case LatestMessageSignal => awaitLatestMessageSignal()
}
private def awaitLatestMessageSignal(): Unit = {
import scala.concurrent.duration._
context.system.scheduler.scheduleOnce(30.seconds) {
context.stop(self)
}
}
}
当actor收到一个... LatestMessageSignal
信息,它将调用 awaitLatestMessageSignal()
方法,将等待30秒,然后停止该演员。
似乎你想在30秒不活动后停止actor?如果是这样,你可以使用 ActorContext#setReceiveTimeout(Duration)
例如:
case object LatestMessageSignal
class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {
context.setReceiveTimeout(30.seconds)
override def receive: Receive = {
case ReceivedTimeout => context.stop(self)
}
}
据我所知,你想保留... MessageCheckpoint
活着,并在30秒内没有新的消息传来时停止它。
这个actor会一直活着,直到你向它发送消息,并在30秒不活动后停止。
case object LatestMessageSignal
class MessageCheckpoint extends Actor with ActorLogging {
override def postStop(): Unit = {
super.postStop()
log.info("Stopping")
}
override def receive: Receive = receiveWithTimer(None)
private def receiveWithTimer(timer: Option[Cancellable]): Receive = {
case LatestMessageSignal =>
timer.foreach(_.cancel())
context.become(receiveWithTimer(Option(initiateTimer())))
}
private def initiateTimer(): Cancellable = {
import context.dispatcher
log.info("Initiating new poison pill timer")
context.system.scheduler.scheduleOnce(30.seconds, self, PoisonPill)
}
}
我希望当另一个新的消息到来时,这个actor可以丢弃当前处理的消息,并处理最新的消息。
这是不可能的。我想你是假设这种方法 awaitLatestMessageSignal
是阻塞的。这个方法是非阻塞性的,它将创建一个定时器并立即返回。消息将被快速处理,并为下一条消息做好准备。Actor一次只处理一条消息,没有办法取消消息处理。