限制并发并等待所有承诺完成,即使有些承诺被拒绝

问题描述 投票:0回答:3

我的问题基本上是

的组合

我知道

Promise.allSettled
,但我未能找到限制并发的好方法。

到目前为止我所拥有的:

想法 1 使用

p-limit
:

const pLimit = require('p-limit');
const limit = pLimit(10);

let promises = files.map(pair => {
    var formData = {
        'file1': fs.createReadStream(pair[0]),
        'file2': fs.createReadStream(pair[1])
    };
        
    return limit(() => uploadForm(formData));
});
    
(async () => {
    const result = await Promise.allSettled(promises).then(body => {
        body.forEach(value => {
            if(value.status == "rejected")
                file.write(value.reason + '\n---\n');
        });
    });
})();

这个解决方案的问题是,我必须首先创建所有承诺,并为此为每个承诺打开两个文件流,并且我将达到打开文件的限制。

想法 2 使用

p-queue
: 我尝试使用生成器函数在
queue.on
'next'
事件中创建和添加新的承诺,但我无法让它正常工作,这可能不是适合这项工作的工具。

想法 3 使用 PromisePool: 一开始这看起来很有希望。其中一些支持生成器函数来为池创建承诺,但我找不到一个明确声明其行为类似于

Promise.allSettled

我实现了

es6-promise-pool
,却发现它会在第一个承诺拒绝后停止。

javascript node.js concurrency promise
3个回答
8
投票

自己实现它非常简单 - 创建一个函数数组,在调用时返回 Promise。然后实现一个限制器函数,该函数从该数组中获取函数并调用它们,完成后,再次递归调用限制器,直到数组为空:

const request = (file) => new Promise((res, rej) => {
  console.log('requesting', file);
  setTimeout(() => {
    if (Math.random() < 0.5) {
      console.log('resolving', file);
      res(file);
    } else {
      console.log('rejecting', file);
      rej(file);
    }
  }, 1000 + Math.random() * 1000);
});
const files = [1, 2, 3, 4, 5, 6];

const makeRequests = files.map(file => () => request(file));
const results = [];
let started = 0;
const recurse = () => {
  const i = started++;
  const makeRequest = makeRequests.shift();
  return !makeRequest ? null : Promise.allSettled([makeRequest()])
    .then(result => {
      results[i] = result[0];
      return recurse();
    })
};
const limit = 2;
Promise.all(Array.from({ length: limit }, recurse))
  .then(() => {
    console.log(results);
  });

如果结果的顺序并不重要,可以通过删除

started
i
变量来简化。


1
投票

接受的答案或多或少类似于

p-limit
。 您遇到了
p-limit
的问题,因为流是在限制回调之外声明的。

这可以解决你的问题:

let promises = files.map(pair => {  
    return limit(() => uploadForm({
        'file1': fs.createReadStream(pair[0]),
        'file2': fs.createReadStream(pair[1])
    }));
});

0
投票

这里有jcouyang的git的修改

支持进行中停止队列

function promiseAllStepN(n, list) {
  const head = list.slice(0, n);
  const tail = list.slice(n);
  const resolved = [];
  let stop = false;

  return {
    start: () =>
      new Promise((resolve) => {
        let processed = 0;
        function runNext() {
          if (processed === tail.length || stop) {
            resolve(Promise.all(resolved));
            return;
          }
          const promise = tail[processed](processed);
          resolved.push(
            promise.then((result) => {
              runNext();
              return result;
            })
          );
          processed++;
        }
        head.forEach((func) => {
          const promise = func(processed);
          resolved.push(
            promise.then((result) => {
              runNext();
              return result;
            })
          );
        });
      }),
    stop: () => {
      stop = true;
    },
  };
}

使用方法:

const { start, stop } = promiseAllStepN(10, promises);

// start run promises in pool
start();

// Stop before all promis finished
stop()
© www.soinside.com 2019 - 2024. All rights reserved.