关于
Source.fromIterator
的 Akka 文档 (https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) 说:
如果迭代器执行阻塞操作,请确保在单独的调度程序上运行它。
我有一个迭代器,我想用它构造一个 Akka
Source
。但是,在 hasNext
返回的元素可用之前,true
可能会返回 next
。如果发生这种情况,则 next
调用必须阻塞,等待元素。请注意,hasNext
不能返回false
,因为Akka会认为Source
已完成并停止处理。
当然,Akka 不喜欢等待下一个元素。偶尔会弹出超时异常。那么,在这种情况下我该怎么办?在单独的调度程序上运行这样的迭代器意味着什么?如果我将代码包装在
next
中以返回 Future
(为此使用单独的调度程序),那么我会得到 Source[Future[T]]
而不是 Source[T]
,这会在下游产生问题,除非我以某种方式将其转换为 Future[Source[T]]
.
有什么建议吗?
正如您所注意到的,如果您将迭代器转换为
Iterator[Future[T]]
,并在另一个线程上完成 future,则 Source.fromIterator
将是 Source[Future[T]]
。但是,通过附加 Source[Future[T]]
,Source[T]
始终可以变成 mapAsync(identity)
:
Source.fromIterator(...) // Source[Future[T]]
.mapAsync(identity) // Source[T]
mapAsync
不会阻塞流调度程序中的线程。
您还可以使用
ActorAttributes.dispatcher
和 withAttributes
设置在特定调度程序上运行的阶段:
Source.fromIterator(...).withAttributes(ActorAttributes.dispatcher("config.path.to.dispatcher"))
请注意,这将在源和下游之间引入异步边界(及其关联的缓冲区)。