在我的用例中,我需要使用 RSocket 协议通过网络创建反应流,并在客户端返回 pekko Source,在服务器端返回 pekko Sink。我在客户端没有任何问题,因为创建连接会返回一个 Flux,可以使用 Source.fromPublisher 将其轻松转换为 pekko Source。
在服务器端,RSocket 期望 Flux 作为输入,但我有其他代码期望得到 pekko Sink 作为结果(它不能接受 ManyWithUpstream,除非我可以将其转换为 Sink)。
这是一个最小的代码示例,它可以完美地适合我的用例,但依赖于已弃用的 EmitterProcessor。
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}
我已经尝试了以下操作,但是当我尝试运行它时,由于我不完全理解的原因,馈送到 Sink 的任何内容实际上都不会馈送到 RSocket 中。
def serverSink : Sink[Payload, NotUsed] = {
val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(processor)
)).bindNow(TcpServerTransport.create("localhost", 3141))
Sink.fromSubscriber(processor)
}
所以,我有一个解决方案,但有点hacky。由于我不明白的原因,pekko 的发布者需要一个不同于 RSocket 的订阅者。这是可行的,但需要注意的是,如果没有连接的客户端,接收器将消耗一切可能的东西(这在我的用例中实际上是首选)
def serverSink : Sink[Payload, NotUsed] = {
val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
sink.mapMaterializedValue { pub =>
pub.subscribe(new Subscriber[Payload] {
override def onComplete(): Unit = ()
override def onError(t: Throwable): Unit = ()
override def onNext(t: Payload): Unit = ()
override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
})
RSocketServer.create(
SocketAcceptor.forRequestStream(payload =>
Flux.from(pub)
)).bindNow(TcpServerTransport.create("localhost", 3141))
NotUsed.notUsed()
}
}