我有一个异步生成器,可以执行以下操作:
async function * fetchPaths() {
const urls = await getUrls()
for (const url of urls) {
const paths = await fetch(url)
for (const path of paths) {
yield path
}
}
}
然后我像这样调用这个函数:
for await (const path of fetchPaths()) {
await submit(path)
}
我希望这两个异步函数
fetch
和 submit
彼此并行运行,但不会互相压倒。所以看起来 pipeline 是完美的选择 - 据我了解,它可以将 AsyncGenerators 作为参数,并处理缓冲和背压。它确实工作,但是当我像这样使用它时,它仍然会串行运行它们(例如等待fetch
然后等待所有对submit
的调用完成,然后再调用下一个fetch
...
const { pipeline } = require('node:stream/promises')
async function run() {
await pipeline(
fetchPaths(),
async function(source, { signal }) { // also not clear on what to do with `signal`
for await (const path of source) { // `source` is the fetchPaths generator?
await submit(source)
}
}
)
}
这有点出乎意料,因为在我看来,
pipeline
似乎只会继续调用迭代器的 next
方法,并且在目的地太慢时处理缓冲,并将其保持在水印之下。我通读了关于背压的 Node.js 学习模块,我理解了这个概念,但它对 pipeline
的细节以及在 pipeline
中使用 AsyncGenerators 的了解很少。我是否应该为生产者和消费者创建一个自定义的可读和可写类?我还漏掉了什么吗?
我发现这个存储库似乎表明,是的,您需要为
Readable
实现 pipeline
才能帮助数据流和背压:https://github.com/MattMorgis/async-stream -生成器/blob/master/index.js
这里没有详细介绍如何实现Writable那一半,但是既然实现了
Readable
接口,按理说也必须实现Writable
接口才能实现两个流之间的通信。