我尝试在Akka流中连接多个流,并根据流以不同的方式处理它们的错误。可以使用诸如此类的东西来完成:
Flow[String, Either[ProcessingError, String], NotUsed]
然后根据任何一个值将响应转移到错误处理程序。
我的问题是,某些流程返回Future [String]而不是String,我不知道如何评估它以便能够在每个Flow之后捕获错误并以自定义方式处理它。
要在不使流失败的情况下将Future
转换为Either
,可以使用
.mapAsync(1){ e =>
val f: Future[T] = ...
f.transformWith(_.toEither)
}
mapAsync
和mapAsyncUnordered
是评估Akka Streams中期货的惯用方法。请注意,那些失败的将来将使流失败,要处理流中的错误,您需要对将来进行“立即”响应以将其转换为Try
或Either
。