我正在尝试将传入的
Source[ByteString, Any]
放入 2 sinks
并尝试复制 akka streaming graphs
中的传入流。我想要 'is'
作为输入流,但 bs
不是 Source[ByteString, Any]
类型。我越来越boxed error
。
private def duplicateStream(content: Source[ByteString, Any]): Future[Either[X, Y]] = {
val sink = StreamConverters.asInputStream()
val (is, bs): (InputStream, Future[ByteString]) = content
.alsoToMat(sink)(Keep.right)
.toMat(Sink.last)(Keep.both)
.run()
//is is input stream which is desired
//bs should be Source[ByteString, Any]
}
如何从该图中得到
bs
asSource[ByteString, Any]
?
您正在使用
Sink.last
,根据定义,它将仅保留源的最后一项,并具体化为该项目的 Future
。
因此,您观察到,您拥有两个接收器的物化值:输入流和最后一个值。