我正在学习 Akka 远程处理和集群。当我尝试向远程 Actor 发送消息时,我没有得到预期的行为。
远程Actor部署在本地主机:2552
object SimpleActor {
def apply(): Behavior[String] = Behaviors.receive { (context, message) =>
context.log.info(s"Simple Actor System got a message: $message")
Behaviors.same
}
def registerActor(): Behavior[Unit] = Behaviors.setup { context =>
val simpleActor = context.spawnAnonymous(SimpleActor())
context.system.receptionist ! Receptionist.Register(MyServiceKey, simpleActor)
context.log.info(s"Registered SimpleActor to ${MyServiceKey.toString}")
Behaviors.empty
}
}
object SimpleActorSystem {
def main(args: Array[String]): Unit = {
ActorSystem(SimpleActor.registerActor(), "SimpleActorSystemRemote",
ConfigFactory.load("part2_remoting/remoteActors.conf").getConfig("remoteSystem"))
}
}
当上面的代码运行时,我看到 SimpleActor 已成功注册到 MyServiecKey。
主要参与者尝试向远程参与者发送消息:
object Pinger {
def apply(pingService: ActorRef[String], message: String): Behavior[Unit] = Behaviors.setup { _ =>
pingService ! message
Behaviors.empty
}
}
object Guardian {
def apply(): Behavior[Receptionist.Listing] = Behaviors.setup[Receptionist.Listing] { context =>
context.system.receptionist ! Receptionist.Subscribe(MyServiceKey, context.self)
Behaviors.receiveMessagePartial[Receptionist.Listing] {
case MyServiceKey.Listing(listings) =>
println(s"Size of listings: ${listings.size}")
listings.foreach(ref => context.spawnAnonymous(Pinger(ref, "Hello from Remote Actor!")))
Behaviors.same
}
Behaviors.same
}
}
def main(args: Array[String]): Unit = {
ActorSystem(Guardian(), "Guardian")
}
ActorSystem 的配置详细信息:
akka {
actor {
provider = remote
}
remote {
artery {
enabled = on
transport = aeron-udp
canonical.hostname = "localhost"
canonical.port = 2551
}
}
}
remoteSystem {
akka {
actor {
provider = remote
}
remote {
artery {
enabled = on
transport = aeron-udp
canonical.hostname = "localhost"
canonical.port = 2552
}
}
}
}
预期行为:
[SimpleActorSystem-akka.actor.default-dispatcher-9] INFO part2_remoting.SimpleActor$ -- Simple Actor System got a message: Hello from Remote Actor!
向 Actor 发送消息是否需要集群?
感谢任何帮助!
接待员参与者发现机制仅在使用集群(特别是类型化集群)时传播注册。
但是,向远程发送消息时并不要求使用集群
ActorSystem
:仅需要远程处理。
Remoting 不为 Actor 提供高级发现 API:相反,以经典方式,您为要向其发送消息的 Actor 构造一个 URL,并将其转换为
ActorRef
(这是 Akka Classic,所以在打字中,它实际上是一个 ActorRef[Any]
)。 如果您想在没有集群的情况下使用远程处理,那么对于您希望能够直接联系的任何参与者(无需实施和使用某些发现方案),强烈建议避开 spawnAnonymous
。
例如,要向
localhost:2552/user/toplevelChild
的 Actor 发送消息,您可以执行以下操作:
import akka.actor.{ ActorRef => ClassicActorRef }
import akka.actor.RootActorPath
import akka.actor.typed.scaladsl.adapter.{ ClassicActorRefOps, TypedActorContextOps, TypedActorRefOps }
import akka.util.Timeout
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
sealed trait Command
case class ResolvedRemoteActor(ref: ClassicActorRef) extends Command
case class FailedToResolve(cause: Throwable) extends Command
Behaviors.setup[Command] { context =>
val remotePath = RootActorPath("localhost:2552") / "user" / "topLevelChild"
val selection = context.toClassic.actorSelection(remotePath)
implicit val resolveTimeout = 2.seconds
context.pipeToSelf(selection.resolveOne()) {
case Success(ref) => ResolvedRemoteActor(ref)
case Failure(ex) => FailedToResolve(ex)
}
Behaviors.receiveMessage { msg =>
msg match {
case ResolvedRemoteActor(ref) =>
context.log.info("Resolved [{}]", ref)
// Assuming that the target actor can do something with a String
ref.toTyped[String] ! "hey there!"
Behaviors.same
case FailedToResolve(cause) =>
context.log.warn("Failed to resolve", cause)
Behaviors.same
}
}
}
不用说,特别是使用 Typed 时,通常更容易使用集群(集群本身在其核心上做了一些非常相似的事情:例如参见here和here)