使用 Akka 的 Actor 进行异步消息处理

问题描述 投票:0回答:4

在我的项目中,我使用 Akka 的 Actors。根据定义,Actor 是线程安全的,这意味着在 Actor 的 receive 方法中

def receive = {
    case msg =>
        // some logic here
}

一次只有一个线程处理被注释的代码段。然而,当这段代码是异步的时,事情开始变得更加复杂:

def receive = {
    case msg =>
        Future {
            // some logic here
        }
}

如果我理解正确的话,在这种情况下,可以说只有 Future 构造会被同步,而不是 Future 内部的逻辑。

当然我可能会阻止未来:

def receive = {
    case msg =>
        val future = Future {
            // some logic here
        }
        Await.result(future, 10.seconds)
}

这解决了问题,但我认为我们都应该同意这不是一个可接受的解决方案。

所以这就是我的问题:在异步计算的情况下,如何在不阻塞 Scala 的 Future 的情况下保留 Actor 的线程安全特性?

scala akka actor
4个回答
6
投票
如何在以下情况下保留参与者的线程安全性质 无块 Scala 的异步计算
Future



仅当您修改
Future

内 Actor 的内部状态时,此假设才成立,这首先似乎是一种设计味道。仅通过创建数据的

copy
并使用 pipeTo 将计算结果通过管道传输给 Actor 来使用 future 进行计算。一旦参与者收到计算结果,您就可以安全地对其进行操作:

import akka.pattern.pipe case class ComputationResult(s: String) def receive = { case ComputationResult(s) => // modify internal state here case msg => Future { // Compute here, don't modify state ComputationResult("finished computing") }.pipeTo(self) }



0
投票
Future

。如果数据库查询返回一个

Future[A]
,那么您可以使用
flatMap
来操作
A
并返回一个新的
Future
。类似于
的东西

def receive = { case msg => val futureResult: Future[Result] = ... futureResult.flatMap { result: Result => // .... // return a new Future } }



0
投票

为 mongoDB 请求调度 future。
  • 使用您自己的演员的参考来与您的演员进行交流
  • 告诉来自未来的消息。
  • 根据具体情况,您可能需要做更多的事情才能获得正确的响应。

但这有一个优点,您可以使用参与者状态处理消息,并且可以在拥有线程时随意改变参与者状态。


0
投票

class MyActor extends Actor { var previousMessage: Future[String] = Future.successful("inital result") override def receive: Receive = { case message => { previousMessage = previousMessage .map(Some(_)) .recover{case _ => Option.empty} .flatMap(previousMessageResult /* Option[String] */ => { // process message Future.successful("Result") }) } } }

我不知道这是否有任何缺点。

© www.soinside.com 2019 - 2024. All rights reserved.