通过InputStream管道创建流[ByteString, ByteString, NotUsed]

问题描述 投票:0回答:1

我需要使用akka不支持的压缩来解压缩数据,但其他提供InputStream接口的库支持。 为了使其与 akka 流一起工作,我需要实现函数:

def pipeThroughInputStream(pipeThrough: InputStream => InputStream): Flow[ByteString, ByteString, NotUsed]

但我不知道该怎么做。 我知道像

StreamConverters.asInputStream
StreamConverters.fromInputStream
这样的转换函数,但我不知道如何在这里应用它们。到目前为止我想出的只是

  def pipeThroughInputStream(pipeThrough: InputStream => InputStream): Flow[ByteString, ByteString, NotUsed] = {
    val sink: Sink[ByteString, Source[ByteString, Future[IOResult]]] = StreamConverters.asInputStream().mapMaterializedValue { materializedInputStream =>
      val inputStream = pipeThrough(materializedInputStream)
      StreamConverters.fromInputStream(() => inputStream)
    }
    ???
  }

但我不知道现在如何将这个具体化为 Source 的 Sink 转换回 Flow。

scala akka-stream
1个回答
0
投票

你想要的是类似这样的东西:

def pipeThroughInputStream(pipeThrough: InputStream => InputStream)(implicit mat: Materializer): Flow[ByteString, ByteString, NotUsed] = {
  val sink = StreamConverters.asInputStream().mapMaterializedValue { materializedInputStream =>
    val inputStream = pipeThrough(materializedInputStream)
    StreamConverters.fromInputStream(() => inputStream)
  }
  val source = sink.preMaterialize()
  Flow.fromSinkAndSourceCoupled(sink, source)
}

预物化可以让我们找到正在物化的源头。 然后,我们使用

fromSinkAndSourceCoupled
创建一个流,该流使用接收器作为其接收器侧,使用源作为其源侧。

请注意,预物化将在流的其余部分实现之前(即当您为流构建“蓝图”时)完成所有

InputStream
接线。 如果这是不可取的,您可以通过将最后一对行替换为以下内容来变得更加晦涩:

Flow.lazyFlow { () =>
  Flow.fromSinkAndSourceCoupled(sink, sink.preMaterialize())
}

这将延迟进行

InputStream
布线,直到下游有需求为止。

© www.soinside.com 2019 - 2024. All rights reserved.