如何创建具体化为 ActorRef 的 akka 源,其中传入消息知道发送者

问题描述 投票:0回答:1
  val ref = Source.actorRef[String](
    completionMatcher =  PartialFunction.empty,
    failureMatcher = PartialFunction.empty,
    100000,
    OverflowStrategy.dropNew
  ).to(Sink.foreachAsync(1){ elem =>
    // how to reply to sender
    Future.successful()
  })

上面的示例几乎满足了我的需要,但底层消息不知道发送者。所以无法回复。有没有一种方法或模式可以让我回复发件人,以便它可以与询问模式一起使用,例如:

  import akka.pattern.ask
  (ref ? "request").onComplete {
    case Failure(exception) => logger.error(s"Couldn't receive response", exception)
    case Success(value) => logger.info(s"Received response ${value}")
  }
scala akka-stream
1个回答
0
投票

这对于经典的基于 actor 的 API 和

Source.actorRef
来说是不可能的,正如您所注意到的,它会丢弃发送者。

但是,如果使用 Akka 2.6+,可以将类型化 ask 模式

akka.actor.typed.scaladsl.adapter.ClassicActorRefOps
结合使用,以将发送 actor(在本例中为询问的合成 actor)包含在发送到
Source.actorRef
的消息中。

对于您的示例,您可以将流重写为:

import akka.actor.typed.{ ActorRef => TypedActorRef }

val ref = Source.actorRef[(String, TypedActorRef[Any])](
  completionMatcher =  PartialFunction.empty,
  failureMatcher = PartialFunction.empty,
  100000,
  OverflowStrategy.dropNew
).to(Sink.foreach { // Sink.foreachAsync(1) isn't doing anything here...
  case (elem, sender) =>
    // do stuff with elem?
    sender ! "response"
}

然后询问

import akka.actor.typed.scaladsl.AskPattern._
import akka.actor.typed.scaladsl.Scheduler
import akka.actor.typed.scaladsl.adapter.{ ClassicActorRefOps, ClassicActorSystemOps }

// actorSystem is the classic ActorSystem in use
implicit val scheduler: Scheduler = actorSystem.toTyped.scheduler

ref.ask[Any]("request" -> _).onComplete {
  case Failure(exception) => logger.error(s"Couldn't receive response", exception)
  case Success(value) => logger.info(s"Received response ${value}")
}
© www.soinside.com 2019 - 2024. All rights reserved.