在Play应用中,我创建了一个套接字服务器:
package controllers
import play.api.mvc._
import play.api.libs.streams.ActorFlow
import javax.inject.Inject
import akka.actor.ActorSystem
import akka.stream.Materializer
class Application @Inject() (cc: ControllerComponents)(implicit system: ActorSystem, mat: Materializer)
extends AbstractController(cc) {
def socket = WebSocket.accept[String, String] { request =>
ActorFlow.actorRef { out =>
MyWebSocketActor.props(out)
}
}
}
import akka.actor._
object MyWebSocketActor {
def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}
class MyWebSocketActor(out: ActorRef) extends Actor {
def receive = {
case msg: String =>
out ! ("I received your message: " + msg)
}
}
更新路由文件:
GET /ws controllers.Application.socket
读取https://doc.akka.io/docs/akka-http/current/client-side/websocket-support.html
我定义WebSocketClientFlow:
import akka.actor.ActorSystem
import akka.Done
import akka.http.scaladsl.Http
import akka.stream.ActorMaterializer
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
// Future[Done] is the materialized value of Sink.foreach,
// emitted when the stream completes
val incoming: Sink[Message, Future[Done]] =
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
}
// send this as a message over the WebSocket
val outgoing = Source.single(TextMessage("hello world!"))
// flow to use (note: not re-usable!)
val webSocketFlow = Http().webSocketClientFlow(WebSocketRequest("ws://localhost:9000/ws"))
// the materialized value is a tuple with
// upgradeResponse is a Future[WebSocketUpgradeResponse] that
// completes or fails when the connection succeeds or fails
// and closed is a Future[Done] with the stream completion from the incoming sink
val (upgradeResponse, closed) =
outgoing
.viaMat(webSocketFlow)(Keep.right) // keep the materialized Future[WebSocketUpgradeResponse]
.toMat(incoming)(Keep.both) // also keep the Future[Done]
.run()
// just like a regular http request we can access response status which is available via upgrade.response.status
// status code 101 (Switching Protocols) indicates that server support WebSockets
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
// in a real application you would not side effect here
connected.onComplete(println)
closed.foreach(_ => println("closed"))
}
}
运行文件WebSocketClientFlow
返回以下内容:
Success(Done)
closed
对应于正在执行的这些行:
connected.onComplete(println)
closed.foreach(_ => println("closed"))
因此似乎套接字已被成功创建和访问。
但是,MyWebSocketActor
应该输出收到的未输出的消息:
def receive = {
case msg: String =>
out ! ("I received your message: " + msg)
}
[Hello world]消息在WebSocketClientFlow
中发送:
val outgoing = Source.single(TextMessage("hello world!"))
我如何输出在套接字服务器中发送的消息并在客户端中输出响应?
发送到客户端的输出在MyWebSocketActor
中定义:
out ! ("I received your message: " + msg)
更新:
receive
功能:
def receive = {
case msg: String =>
out ! ("I received your message: " + msg)
}
正在被调用,但是套接字客户端似乎没有发送或接收out消息("I received your message: " + msg)
。
我尝试更改从发送给客户端的响应
out ! ("I received your message: " + msg)
to
out ! TextMessage("hello world!")
应调用:
Sink.foreach[Message] {
case message: TextMessage.Strict =>
println(message.text)
}
但是由于未调用println(message.text)
,因此未收到该消息。
发布答案案例对其他开发人员很有用。如果有任何可以改进的地方,请发表评论。
暴露一个套接字:
package controllers
import play.api.mvc._
import play.api.libs.streams.ActorFlow
import javax.inject.Inject
import akka.actor.ActorSystem
import akka.http.scaladsl.model.ws.TextMessage
import akka.stream.Materializer
import akka.actor._
class Application @Inject()(cc: ControllerComponents)(implicit system: ActorSystem, mat: Materializer)
extends AbstractController(cc) {
def socket = WebSocket.accept[String, String] { request =>
ActorFlow.actorRef { out =>
MyWebSocketActor.props(out)
}
}
}
object MyWebSocketActor {
var counter = 0
def props(out: ActorRef) = Props(new MyWebSocketActor(out))
}
class MyWebSocketActor(out: ActorRef) extends Actor {
def receive = {
case msg: String =>
MyWebSocketActor.counter = MyWebSocketActor.counter + 1
print("received message: " + msg + "," + MyWebSocketActor.counter)
out ! "hello world! "
}
}
测试套接字:
import akka.actor.{ActorRef, ActorSystem}
import akka.{Done, NotUsed}
import akka.http.scaladsl.Http
import akka.stream.{ActorMaterializer, OverflowStrategy}
import akka.stream.scaladsl._
import akka.http.scaladsl.model._
import akka.http.scaladsl.model.ws._
import scala.concurrent.Future
object WebSocketClientFlow {
def main(args: Array[String]) = {
implicit val system = ActorSystem()
implicit val materializer = ActorMaterializer()
import system.dispatcher
val req = WebSocketRequest(uri = "ws://localhost:9000/ws")
val webSocketFlow = Http().webSocketClientFlow(req)
val messageSource: Source[Message, ActorRef] =
Source.actorRef[TextMessage.Strict](bufferSize = 10, OverflowStrategy.fail)
val messageSink: Sink[Message, NotUsed] =
Flow[Message]
.map(message => println(s"Received text message: [$message]"))
.to(Sink.ignore)
val ((ws, upgradeResponse), closed) =
messageSource
.viaMat(webSocketFlow)(Keep.both)
.toMat(messageSink)(Keep.both)
.run()
val connected = upgradeResponse.flatMap { upgrade =>
if (upgrade.response.status == StatusCodes.SwitchingProtocols) {
Future.successful(Done)
} else {
throw new RuntimeException(s"Connection failed: ${upgrade.response.status}")
}
}
ws ! TextMessage.Strict("Hello World")
ws ! TextMessage.Strict("Hi")
ws ! TextMessage.Strict("Yay!")
}
}
在Play2Run控制台上,打印为:
[info] play.api.Play - Application started (Dev) (no global state)
received message: Hello World,1received message: Hi,2received message: Yay!,3
在WebSocketClientFlow控制台上,打印为:
Received text message: [TextMessage.Strict(hello world! )]
Received text message: [TextMessage.Strict(hello world! )]
Received text message: [TextMessage.Strict(hello world! )]