来自具有阻塞操作的迭代器的 Akka 源代码

问题描述 投票:0回答:1

关于

Source.fromIterator
的 Akka 文档 (https://doc.akka.io/docs/akka/current/stream/operators/Source/fromIterator.html) 说:

如果迭代器执行阻塞操作,请确保在单独的调度程序上运行它。

我有一个迭代器,我想用它构造一个 Akka

Source
。但是,在
hasNext
返回的元素可用之前,
true
可能会返回
next
。如果发生这种情况,则
next
调用必须阻塞,等待元素。请注意,
hasNext
不能返回
false
,因为Akka会认为
Source
已完成并停止处理。

当然,Akka 不喜欢等待下一个元素。偶尔会弹出超时异常。那么,在这种情况下我该怎么办?在单独的调度程序上运行这样的迭代器意味着什么?如果我将代码包装在

next
中以返回
Future
(为此使用单独的调度程序),那么我会得到
Source[Future[T]]
而不是
Source[T]
,这会在下游产生问题,除非我以某种方式将其转换为
Future[Source[T]] 
.

有什么建议吗?

scala akka akka-stream alpakka
1个回答
0
投票

正如您所注意到的,如果您将迭代器转换为

Iterator[Future[T]]
,并在另一个线程上完成 future,则
Source.fromIterator
将是
Source[Future[T]]
。但是,通过附加
Source[Future[T]]
Source[T]
始终可以变成
mapAsync(identity)

Source.fromIterator(...)  // Source[Future[T]]
  .mapAsync(identity)     // Source[T]

mapAsync
不会阻塞流调度程序中的线程。

您还可以使用

ActorAttributes.dispatcher
withAttributes
设置在特定调度程序上运行的阶段:

Source.fromIterator(...).withAttributes(ActorAttributes.dispatcher("config.path.to.dispatcher"))

请注意,这将在源和下游之间引入异步边界(及其关联的缓冲区)。

© www.soinside.com 2019 - 2024. All rights reserved.