为什么 Node.js 流管道串联运行我的 asyncGenerators?

问题描述 投票:0回答:1

我有一个异步生成器,可以执行以下操作:

async function * fetchPaths() {
  const urls = await getUrls()
  for (const url of urls) {
    const paths = await fetch(url)
    for (const path of paths) {
      yield path
    }
  }
}

然后我像这样调用这个函数:

  for await (const path of fetchPaths()) {
    await submit(path)
  }

我希望这两个异步函数

fetch
submit
彼此并行运行,但不会互相压倒。所以看起来 pipeline 是完美的选择 - 据我了解,它可以将 AsyncGenerators 作为参数,并处理缓冲和背压。它确实工作,但是当我像这样使用它时,它仍然会串行运行它们(例如等待
fetch
然后等待所有对
submit
的调用完成,然后再调用下一个
fetch
...

const { pipeline } = require('node:stream/promises')

async function run() {
  await pipeline(
    fetchPaths(),
    async function(source, { signal }) { // also not clear on what to do with `signal`
      for await (const path of source) { // `source` is the fetchPaths generator?
        await submit(source)
      }
    }
  )
}

这有点出乎意料,因为在我看来,

pipeline
似乎只会继续调用迭代器的
next
方法,并且在目的地太慢时处理缓冲,并将其保持在水印之下。我通读了关于背压的 Node.js 学习模块,我理解了这个概念,但它对
pipeline
的细节以及在
pipeline
中使用 AsyncGenerators 的了解很少。我是否应该为生产者和消费者创建一个自定义的可读和可写类?我还漏掉了什么吗?

node.js node-streams
1个回答
0
投票

我发现这个存储库似乎表明,是的,您需要为

Readable
实现
pipeline
才能帮助数据流和背压:https://github.com/MattMorgis/async-stream -生成器/blob/master/index.js

这里没有详细介绍如何实现Writable那一半,但是既然实现了

Readable
接口,按理说也必须实现
Writable
接口才能实现两个流之间的通信。

© www.soinside.com 2019 - 2024. All rights reserved.