为什么 Akka http 不使用我在这段代码中定义的自定义执行上下文,并且总是回退到默认执行和默认线程池?
package com.vaz.www
import akka.actor.typed.ActorSystem
import akka.actor.typed.scaladsl.Behaviors
import akka.http.scaladsl.Http
import akka.http.scaladsl.model._
import akka.http.scaladsl.server.Directives._
import akka.http.scaladsl.server.Route.seal
import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import java.util.concurrent.Executors
import scala.io.StdIn
import akka.http.scaladsl.marshallers.sprayjson.SprayJsonSupport._
import spray.json._
import akka.actor.typed.DispatcherSelector
import scala.concurrent.ExecutionContext
import scala.concurrent.ExecutionContextExecutor
import org.slf4j.LoggerFactory
object MyServer {
// implicit val ec: scala.concurrent.ExecutionContext =
// scala.concurrent.ExecutionContext.global // fromExecutorService(Executors.newFixedThreadPool(5))
def main(args: Array[String]): Unit = {
implicit val system = ActorSystem(Behaviors.empty, "Main")
implicit val myExCon: ExecutionContextExecutor = system.dispatchers.lookup(
DispatcherSelector.fromConfig("akka.actor.my-ex-con-dispatcher")
)
val logger = LoggerFactory.getLogger(getClass)
val route = {
withExecutionContext(myExCon) {
path("") {
get {
extractExecutionContext { implicit exec =>
println(
s"Execution and the thread:${Thread.currentThread().getName()}"
)
logger.info(
s"Execution and the thread:${Thread.currentThread().getName()}"
)
complete(
HttpEntity(
ContentTypes.`text/html(UTF-8)`,
"<h1>Hi</h1>"
)
)
}
}
}
}
}
val bindingFuture = Http().newServerAt("localhost", 9000).bind(route)
println(
s"Server now online. Please navigate to http://localhost:9000/hello\nPress RETURN to stop..."
)
StdIn.readLine()
bindingFuture.flatMap(_.unbind()).onComplete(_ => system.terminate())
}
}
我的应用程序配置
akka{
actor{
my-ex-con-dispatcher {
type = Dispatcher
executor = "fork-join-executor"
fork-join-executor {
parallelism-min = 2
parallelism-factor = 2.0
parallelism-max = 10
}
throughput = 100
}
}
}
build.sbt
import Dependencies._
ThisBuild / scalaVersion := "3.3.4"
ThisBuild / version := "0.1.0-SNAPSHOT"
ThisBuild / organization := "com.example"
ThisBuild / organizationName := "example"
lazy val root = (project in file("."))
.settings(
name := "akka-server-example",
libraryDependencies += munit % Test
)
val AkkaVersion = "2.9.3"
val AkkaHttpVersion = "10.6.3"
resolvers += "Akka library repository".at("https://repo.akka.io/maven")
libraryDependencies += "com.typesafe.akka" %% "akka-http-spray-json" % AkkaHttpVersion
libraryDependencies ++= Seq(
"com.typesafe.akka" %% "akka-actor-typed" % AkkaVersion,
"com.typesafe.akka" %% "akka-stream" % AkkaVersion,
"com.typesafe.akka" %% "akka-http" % AkkaHttpVersion
)
当我卷曲 localhost:9000 时,得到的响应为
<h1>Hi</h1>
,
在控制台中我得到了这个:
Server now online. Please navigate to http://localhost:9000/hello
Press RETURN to stop...
Execution and the thread:Main-akka.actor.default-dispatcher-8
我尝试阅读akka [dispatcher](https://doc.akka.io/libraries/akka/current/typed/dispatchers.html),但它没有按预期工作,
您可以将操作移至 Future 内,以确保它在自定义调度程序上运行,如下所示:
val route: Route = {
path("hello") {
withExecutionContext(myExCon) {
extractRequest { request =>
// Move the operation inside a Future to ensure it runs on the custom dispatcher
val result = Future {
val msg = s"Execution context: $myExCon with thread: ${Thread.currentThread.getName}"
println.info(msg)
HttpResponse(StatusCodes.OK, entity = s"$msg")
}(myExCon)
complete(result)
}
}
}
}
这是用pekko完成的,没有
extractExecutionContext
。