在我的项目中,我使用 Akka 的 Actors。根据定义,Actor 是线程安全的,这意味着在 Actor 的 receive 方法中
def receive = {
case msg =>
// some logic here
}
一次只有一个线程处理被注释的代码段。然而,当这段代码是异步的时,事情开始变得更加复杂:
def receive = {
case msg =>
Future {
// some logic here
}
}
如果我理解正确的话,在这种情况下,可以说只有 Future 构造会被同步,而不是 Future 内部的逻辑。
当然我可能会阻止未来:
def receive = {
case msg =>
val future = Future {
// some logic here
}
Await.result(future, 10.seconds)
}
这解决了问题,但我认为我们都应该同意这不是一个可接受的解决方案。
所以这就是我的问题:在异步计算的情况下,如何在不阻塞 Scala 的 Future 的情况下保留 Actor 的线程安全特性?
Future
?
仅当您修改
Future
内 Actor 的内部状态时,此假设才成立,这首先似乎是一种设计味道。仅通过创建数据的
copy并使用
pipeTo
将计算结果通过管道传输给 Actor 来使用 future 进行计算。一旦参与者收到计算结果,您就可以安全地对其进行操作:import akka.pattern.pipe
case class ComputationResult(s: String)
def receive = {
case ComputationResult(s) => // modify internal state here
case msg =>
Future {
// Compute here, don't modify state
ComputationResult("finished computing")
}.pipeTo(self)
}
Future
。如果数据库查询返回一个
Future[A]
,那么您可以使用 flatMap
来操作 A
并返回一个新的 Future
。类似于的东西
def receive = {
case msg =>
val futureResult: Future[Result] = ...
futureResult.flatMap { result: Result =>
// ....
// return a new Future
}
}
为 mongoDB 请求调度 future。
但这有一个优点,您可以使用参与者状态处理消息,并且可以在拥有线程时随意改变参与者状态。
class MyActor extends Actor {
var previousMessage: Future[String] = Future.successful("inital result")
override def receive: Receive = {
case message => {
previousMessage = previousMessage
.map(Some(_))
.recover{case _ => Option.empty}
.flatMap(previousMessageResult /* Option[String] */ => {
// process message
Future.successful("Result")
})
}
}
}
我不知道这是否有任何缺点。