有没有办法创建一个 RSocket“forRequestStream”并将其作为 Pekko/Akka Sink 返回而不使用已弃用的 FluxProcessor?

问题描述 投票:0回答:1

在我的用例中,我需要使用 RSocket 协议通过网络创建反应流,并在客户端返回 pekko Source,在服务器端返回 pekko Sink。我在客户端没有任何问题,因为创建连接会返回一个 Flux,可以使用 Source.fromPublisher 将其轻松转换为 pekko Source。

在服务器端,RSocket 期望 Flux 作为输入,但我有其他代码期望得到 pekko Sink 作为结果(它不能接受 ManyWithUpstream,除非我可以将其转换为 Sink)。

这是一个最小的代码示例,它可以完美地适合我的用例,但依赖于已弃用的 EmitterProcessor。

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = EmitterProcessor.create(1)
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }

我已经尝试了以下操作,但是当我尝试运行它时,由于我不完全理解的原因,馈送到 Sink 的任何内容实际上都不会馈送到 RSocket 中。

def serverSink : Sink[Payload, NotUsed] = {
    val processor : Processor[Payload, Payload] = Flow[Payload].toProcessor.run()
    RSocketServer.create(
      SocketAcceptor.forRequestStream(payload =>
        Flux.from(processor)
    )).bindNow(TcpServerTransport.create("localhost", 3141))
    Sink.fromSubscriber(processor)
  }
scala akka project-reactor rsocket pekko
1个回答
0
投票

所以,我有一个解决方案,但有点hacky。由于我不明白的原因,pekko 的发布者需要一个不同于 RSocket 的订阅者。这是可行的,但需要注意的是,如果没有连接的客户端,接收器将消耗一切可能的东西(这在我的用例中实际上是首选)

def serverSink : Sink[Payload, NotUsed] = {
    val sink : Sink[Payload, Publisher[Payload]] = Sink.asPublisher(true)
    sink.mapMaterializedValue { pub =>
      pub.subscribe(new Subscriber[Payload] {
        override def onComplete(): Unit = ()
        override def onError(t: Throwable): Unit = ()
        override def onNext(t: Payload): Unit = ()
        override def onSubscribe(s: Subscription): Unit = s.request(Long.MaxValue)
      })
      RSocketServer.create(
        SocketAcceptor.forRequestStream(payload =>
          Flux.from(pub)
      )).bindNow(TcpServerTransport.create("localhost", 3141))
      NotUsed.notUsed()
    }
  }
© www.soinside.com 2019 - 2024. All rights reserved.