我正在使用BackoffSupervisor策略来创建一个必须处理某些消息的子actor。我想实现一个非常简单的重启策略,在异常的情况下:
到目前为止我所拥有的是:
主管定义:
val childProps = Props(new SenderActor())
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
childProps,
childName = cmd.hashCode.toString,
minBackoff = 1.seconds,
maxBackoff = 2.seconds,
randomFactor = 0.2
)
.withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException => {
println("caught specific message!")
SupervisorStrategy.Restart
}
case _: Exception => SupervisorStrategy.Restart
case _ ⇒ SupervisorStrategy.Escalate
})
)
val sup = context.actorOf(supervisor)
sup ! cmd
应该发送电子邮件的子actor,但是失败(抛出一些Exception)并将Exception传播回supervisor:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
throw new Exception("surprising exception")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
在上面的代码中,我将任何异常包装到自定义类MessageException中,该类传播到SupervisorStrategy,但是如何将它进一步传播给新子进程以强制重新处理?这是正确的方法吗?
编辑。我试图在preRestart
钩子上向Actor重新发送消息,但不知何故钩子没有被触发:
class SenderActor() extends Actor {
def fakeSendMail():Unit = {
Thread.sleep(1000)
// println("mail sent!")
throw new Exception("surprising exception")
}
override def preStart(): Unit = {
println("child starting")
}
override def preRestart(reason: Throwable, message: Option[Any]): Unit = {
reason match {
case m: MessageException => {
println("aaaaa")
message.foreach(self ! _)
}
case _ => println("bbbb")
}
}
override def postStop(): Unit = {
println("child stopping")
}
override def receive: Receive = {
case cmd: NewMail =>
println("new mail received routee")
try {
fakeSendMail()
} catch {
case t => throw MessageException(cmd, t)
}
}
}
这给了我类似于以下输出的东西:
new mail received routee
caught specific message!
child stopping
[ERROR] [01/26/2018 10:15:35.690]
[example-akka.actor.default-dispatcher-2]
[akka://example/user/persistentActor-4-scala/$a/1962829645] Could not
process message sample.persistence.MessageException:
Could not process message <stacktrace>
child starting
但没有来自preRestart
挂钩的日志
未调用子项的preRestart
挂钩的原因是因为Backoff.onFailure
在封面下使用BackoffOnRestartSupervisor
,它使用与退避策略一致的停止和延迟启动行为替换默认重启行为。换句话说,当使用Backoff.onFailure
时,当孩子重新启动时,孩子的preRestart
方法不会被调用,因为基础主管实际上停止了孩子,然后再次启动它。 (使用Backoff.onStop
可以触发孩子的preRestart
钩子,但这与目前的讨论相关。)
当主管的子进程重新启动时,BackoffSupervisor
API不支持自动重新发送消息:您必须自己实现此行为。重试消息的想法是让BackoffSupervisor
的主管处理它。例如:
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withReplyWhileStopped(ChildIsStopped)
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd) // replace cmd with whatever the property name is
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
def receive = {
case cmd: NewMail =>
sup ! cmd
case Error(cmd) =>
timers.startSingleTimer(cmd.id, Replay(cmd), 10.seconds)
// We assume that NewMail has an id field. Also, adjust the time as needed.
case Replay(cmd) =>
sup ! cmd
case ChildIsStopped =>
println("child is stopped")
}
在上面的代码中,嵌入在NewMail
中的MessageException
消息被包装在一个自定义的case类中(为了很容易将它与“normal”/ new NewMail
消息区分开来)并发送到self
。在这种背景下,self
是创造BackoffSupervisor
的演员。这个封闭的演员然后使用single timer在某个时刻重播原始消息。这个时间点应该足够远,以至于BackoffSupervisor
可能会耗尽SenderActor
的重新启动尝试,以便孩子在收到重新发送的消息之前可以有充分的机会进入“良好”状态。显然,无论子重启次数如何,此示例仅涉及一次重发消息。
另一个想法是为每个BackoffSupervisor
消息创建一个SenderActor
-NewMail
对,并让SenderActor
在NewMail
钩子中将preStart
消息发送给自己。这种方法的一个问题是清理资源;即,当处理成功或孩子重新开始时,关闭BackoffSupervisors
(这将关闭他们各自的SenderActor
孩子)。 NewMail
ids到(ActorRef, Int)
元组的地图(其中ActorRef
是对BackoffSupervisor
演员的引用,而Int
是重启尝试的次数)在这种情况下会有所帮助:
class Overlord extends Actor {
var state = Map[Long, (ActorRef, Int)]() // assuming the mail id is a Long
def receive = {
case cmd: NewMail =>
val childProps = Props(new SenderActor(cmd, self))
val supervisor = BackoffSupervisor.props(
Backoff.onFailure(
...
).withSupervisorStrategy(
OneForOneStrategy(maxNrOfRetries = 3, loggingEnabled = true) {
case msg: MessageException =>
println("caught specific message!")
self ! Error(msg.cmd)
SupervisorStrategy.Restart
case ...
})
)
val sup = context.actorOf(supervisor)
state += (cmd.id -> (sup, 0))
case ProcessingDone(cmdId) =>
state.get(cmdId) match {
case Some((backoffSup, _)) =>
context.stop(backoffSup)
state -= cmdId
case None =>
println(s"${cmdId} not found")
}
case Error(cmd) =>
val cmdId = cmd.id
state.get(cmdId) match {
case Some((backoffSup, numRetries)) =>
if (numRetries == 3) {
println(s"${cmdId} has already been retried 3 times. Giving up.")
context.stop(backoffSup)
state -= cmdId
} else
state += (cmdId -> (backoffSup, numRetries + 1))
case None =>
println(s"${cmdId} not found")
}
case ...
}
}
请注意,上例中的SenderActor
将NewMail
和ActorRef
作为构造函数参数。后一个参数允许SenderActor
向封闭的actor发送自定义的ProcessingDone
消息:
class SenderActor(cmd: NewMail, target: ActorRef) extends Actor {
override def preStart(): Unit = {
println(s"child starting, sending ${cmd} to self")
self ! cmd
}
def fakeSendMail(): Unit = ...
def receive = {
case cmd: NewMail => ...
}
}
显然,SenderActor
在每次使用fakeSendMail
时都会失败。我将留下SenderActor
所需的其他更改来实现快乐路径,其中SenderActor
向ProcessingDone
发送target
消息给你。
失败的子actor可以作为主管策略中的发件人使用。引用https://doc.akka.io/docs/akka/current/fault-tolerance.html#creating-a-supervisor-strategy:
如果策略是在监督actor内声明的(而不是在伴随对象中),则其决策者可以以线程安全的方式访问actor的所有内部状态,包括获取对当前失败的子级的引用(作为发送者可用)失败的消息)。
在@chunjef提供的良好解决方案中,他警告在退避监督员启动工作人员之前重新安排工作的风险
然后,这个封闭的actor使用单个计时器在某个时刻重放原始消息。此时间点应该足够远,以便BackoffSupervisor可能会耗尽SenderActor的重启尝试,以便孩子在收到重新发送的消息之前可以有充分的机会进入“良好”状态。
如果发生这种情况,该方案将成为死信,并且不会进一步取得进展。我用this scenario简化了小提琴。
因此,计划延迟应该大于maxBackoff,这可能代表工作完成时间的影响。避免这种情况的一种可能的解决方案是让工人演员在准备工作时向父亲发送消息,例如here。
在您的情况下,使用某些第三方软件发送电子邮件是一项危险的操作。为什么不应用Circuit Breaker模式并完全跳过发送者演员?此外,你仍然可以在其中有一个演员(有一些Backoff Supervisor)和Circuit Breaker(如果这对你有意义的话)。