自定义处理Akka Streams流中将来的故障

问题描述 投票:-1回答:1

我尝试在Akka流中连接多个流,并根据流以不同的方式处理它们的错误。可以使用诸如此类的东西来完成:

Flow[String, Either[ProcessingError, String], NotUsed]

然后根据任何一个值将响应转移到错误处理程序。

我的问题是,某些流程返回Future [String]而不是String,我不知道如何评估它以便能够在每个Flow之后捕获错误并以自定义方式处理它。

stream akka future flow
1个回答
0
投票

要在不使流失败的情况下将Future转换为Either,可以使用

.mapAsync(1){ e => 
  val f: Future[T] = ...
  f.transformWith(_.toEither)
}

mapAsyncmapAsyncUnordered是评估Akka Streams中期货的惯用方法。请注意,那些失败的将来将使流失败,要处理流中的错误,您需要对将来进行“立即”响应以将其转换为TryEither

© www.soinside.com 2019 - 2024. All rights reserved.