如果一个Actor在一段时间内没有收到消息,如何关闭它?

问题描述 投票:0回答:2

目前的代码如下。

case object LatestMessageSignal

class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {

  override def receive: Receive = {
    case LatestMessageSignal => awaitLatestMessageSignal()
  }

  private def awaitLatestMessageSignal(): Unit = {
    import scala.concurrent.duration._
    context.system.scheduler.scheduleOnce(30.seconds) {
      context.stop(self)
    }
  }

}

当actor收到一个... LatestMessageSignal 信息,它将调用 awaitLatestMessageSignal() 方法,将等待30秒,然后停止该演员。

scala akka actor
2个回答
4
投票

似乎你想在30秒不活动后停止actor?如果是这样,你可以使用 ActorContext#setReceiveTimeout(Duration)

例如:

case object LatestMessageSignal

class MessageCheckpoint(implicit ec: ExecutionContext) extends Actor with ActorLogging with Timers {

  context.setReceiveTimeout(30.seconds)

  override def receive: Receive = {
    case ReceivedTimeout => context.stop(self)
  }

}

2
投票

据我所知,你想保留... MessageCheckpoint 活着,并在30秒内没有新的消息传来时停止它。

这个actor会一直活着,直到你向它发送消息,并在30秒不活动后停止。


case object LatestMessageSignal

class MessageCheckpoint extends Actor with ActorLogging {

  override def postStop(): Unit = {
    super.postStop()
    log.info("Stopping")
  }

  override def receive: Receive = receiveWithTimer(None)

  private def receiveWithTimer(timer: Option[Cancellable]): Receive = {
    case LatestMessageSignal =>
      timer.foreach(_.cancel())
      context.become(receiveWithTimer(Option(initiateTimer())))
  }

  private def initiateTimer(): Cancellable = {
    import context.dispatcher
    log.info("Initiating new poison pill timer")
    context.system.scheduler.scheduleOnce(30.seconds, self, PoisonPill)
  }
}

我希望当另一个新的消息到来时,这个actor可以丢弃当前处理的消息,并处理最新的消息。

这是不可能的。我想你是假设这种方法 awaitLatestMessageSignal 是阻塞的。这个方法是非阻塞性的,它将创建一个定时器并立即返回。消息将被快速处理,并为下一条消息做好准备。Actor一次只处理一条消息,没有办法取消消息处理。

© www.soinside.com 2019 - 2024. All rights reserved.