我正在寻找一个 Promise 函数包装器,它可以在给定的 Promise 运行时进行限制/节流,以便在给定的时间只运行一定数量的 Promise。
在下面的情况下,
delayPromise
永远不应该同时运行,它们应该按照先来先服务的顺序一次运行一个。
import Promise from 'bluebird'
function _delayPromise (seconds, str) {
console.log(str)
return Promise.delay(seconds)
}
let delayPromise = limitConcurrency(_delayPromise, 1)
async function a() {
await delayPromise(100, "a:a")
await delayPromise(100, "a:b")
await delayPromise(100, "a:c")
}
async function b() {
await delayPromise(100, "b:a")
await delayPromise(100, "b:b")
await delayPromise(100, "b:c")
}
a().then(() => console.log('done'))
b().then(() => console.log('done'))
关于如何设置这样的队列有什么想法吗?
我有一个来自美妙的
Benjamin Gruenbaum
的“去抖”功能。我需要修改它以根据它自己的执行而不是延迟来限制承诺。
export function promiseDebounce (fn, delay, count) {
let working = 0
let queue = []
function work () {
if ((queue.length === 0) || (working === count)) return
working++
Promise.delay(delay).tap(function () { working-- }).then(work)
var next = queue.shift()
next[2](fn.apply(next[0], next[1]))
}
return function debounced () {
var args = arguments
return new Promise(function (resolve) {
queue.push([this, args, resolve])
if (working < count) work()
}.bind(this))
}
}
我认为没有任何库可以做到这一点,但实际上自己实现非常简单:
function sequential(fn) { // limitConcurrency(fn, 1)
let q = Promise.resolve();
return function(x) {
const p = q.then(() => fn(x));
q = p.reflect();
return p;
};
}
对于多个并发请求,它会变得有点棘手,但也可以完成。
function limitConcurrency(fn, n) {
if (n == 1) return sequential(fn); // optimisation
let q = Promise.resolve();
const active = new Set();
const fst = t => t[0];
const snd = t => t[1];
return function(x) {
function put() {
const p = fn(x);
const a = p.reflect().then(() => {
active.delete(a);
});
active.add(a);
return [Promise.race(active), p];
}
if (active.size < n) {
const r = put()
q = fst(t);
return snd(t);
} else {
const r = q.then(put);
q = r.then(fst);
return r.then(snd)
}
};
}
顺便说一句,您可能想看看 actors 模型 和 CSP。他们可以简化处理此类事情,也有一些 JS 库。
示例
import Promise from 'bluebird'
function sequential(fn) {
var q = Promise.resolve();
return (...args) => {
const p = q.then(() => fn(...args))
q = p.reflect()
return p
}
}
async function _delayPromise (seconds, str) {
console.log(`${str} started`)
await Promise.delay(seconds)
console.log(`${str} ended`)
return str
}
let delayPromise = sequential(_delayPromise)
async function a() {
await delayPromise(100, "a:a")
await delayPromise(200, "a:b")
await delayPromise(300, "a:c")
}
async function b() {
await delayPromise(400, "b:a")
await delayPromise(500, "b:b")
await delayPromise(600, "b:c")
}
a().then(() => console.log('done'))
b().then(() => console.log('done'))
// --> with sequential()
// $ babel-node test/t.js
// a:a started
// a:a ended
// b:a started
// b:a ended
// a:b started
// a:b ended
// b:b started
// b:b ended
// a:c started
// a:c ended
// b:c started
// done
// b:c ended
// done
// --> without calling sequential()
// $ babel-node test/t.js
// a:a started
// b:a started
// a:a ended
// a:b started
// a:b ended
// a:c started
// b:a ended
// b:b started
// a:c ended
// done
// b:b ended
// b:c started
// b:c ended
// done
使用throttled-promise模块:
https://www.npmjs.com/package/throttled-promise
var ThrottledPromise = require('throttled-promise'),
promises = [
new ThrottledPromise(function(resolve, reject) { ... }),
new ThrottledPromise(function(resolve, reject) { ... }),
new ThrottledPromise(function(resolve, reject) { ... })
];
// Run promises, but only 2 parallel
ThrottledPromise.all(promises, 2)
.then( ... )
.catch( ... );
我也有同样的问题。我写了一个库来实现它。代码在这里。我创建了一个队列来保存所有的承诺。当您将一些 Promise 推送到队列时,队列头部的前几个 Promise 将被弹出并运行。一旦一个 Promise 完成,队列中的下一个 Promise 也会被弹出并运行。如此周而复始,直到队列中没有
Task
。您可以查看代码了解详细信息。希望这个图书馆能帮助你。
- 您可以定义并发承诺的数量(接近同时请求)
- 一致的流程:一旦一个承诺得到解决,另一个请求就会开始,无需猜测服务器能力
- 对数据阻塞具有鲁棒性,如果服务器暂时停止,它只会等待,并且不会开始下一个任务,因为 允许使用时钟
- 不要依赖第三方模块,它是 Vanila node.js
第一件事是让 https 成为一个承诺,这样我们就可以使用 wait 来检索数据(从示例中删除) 第二创建一个承诺调度程序,在任何承诺得到解决时提交另一个请求。 第三次打电话
const https = require('https')
function httpRequest(method, path, body = null) {
const reqOpt = {
method: method,
path: path,
hostname: 'dbase.ez-mn.net',
headers: {
"Content-Type": "application/json",
"Cache-Control": "no-cache"
}
}
if (method == 'GET') reqOpt.path = path + '&max=20000'
if (body) reqOpt.headers['Content-Length'] = Buffer.byteLength(body);
return new Promise((resolve, reject) => {
const clientRequest = https.request(reqOpt, incomingMessage => {
let response = {
statusCode: incomingMessage.statusCode,
headers: incomingMessage.headers,
body: []
};
let chunks = ""
incomingMessage.on('data', chunk => { chunks += chunk; });
incomingMessage.on('end', () => {
if (chunks) {
try {
response.body = JSON.parse(chunks);
} catch (error) {
reject(error)
}
}
console.log(response)
resolve(response);
});
});
clientRequest.on('error', error => { reject(error); });
if (body) { clientRequest.write(body) }
clientRequest.end();
});
}
const asyncLimit = (fn, n) => {
const pendingPromises = new Set();
return async function(...args) {
while (pendingPromises.size >= n) {
await Promise.race(pendingPromises);
}
const p = fn.apply(this, args);
const r = p.catch(() => {});
pendingPromises.add(r);
await r;
pendingPromises.delete(r);
return p;
};
};
// httpRequest is the function that we want to rate the amount of requests
// in this case, we set 8 requests running while not blocking other tasks (concurrency)
let ratedhttpRequest = asyncLimit(httpRequest, 8);
// this is our datase and caller
let process = async () => {
patchData=[
{path: '/rest/slots/80973975078587', body:{score:3}},
{path: '/rest/slots/809739750DFA95', body:{score:5}},
{path: '/rest/slots/AE0973750DFA96', body:{score:5}}]
for (let i = 0; i < patchData.length; i++) {
ratedhttpRequest('PATCH', patchData[i].path, patchData[i].body)
}
console.log('completed')
}
process()
我写了 @watchable/nevermore 来满足这个要求,并允许并发与速率限制、超时、退避等的组合。
您可以通过像这样的限制器功能来实现您的要求...
import { createExecutorStrategy } from "@watchable/nevermore";
const { createExecutor } = createExecutorStrategy({
"concurrency": 1,
})
const _delay = (ms) => new Promise((resolve) => setTimeout(resolve, ms));
const delay = createExecutor(_delay);
但是,您还可以更进一步,设置速率限制、重试等等...
const { createExecutor } = createExecutorStrategy({
concurrency: 1,
intervalMs: 100,
backoffMs: 1000,
timeoutMs: 3000,
retries: 3,
});
完整的API文档位于https://watchable.dev/api/modules/_watchable_nevermore.html并从npm安装https://www.npmjs.com/package/@watchable/nevermore
串行运行异步进程的经典方法是使用
async.js
并使用 async.series()
。如果您更喜欢基于 Promise 的代码,那么有一个 Promise 版本的 async.js
:async-q
使用
async-q
,您可以再次使用series
:
async.series([
function(){return delayPromise(100, "a:a")},
function(){return delayPromise(100, "a:b")},
function(){return delayPromise(100, "a:c")}
])
.then(function(){
console.log(done);
});
同时运行其中两个将同时运行
a
和 b
,但在每个过程中它们将是连续的:
// these two will run concurrently but each will run
// their array of functions sequentially:
async.series(a_array).then(()=>console.log('a done'));
async.series(b_array).then(()=>console.log('b done'));
如果你想在
b
之后运行 a
然后将其放入 .then()
:
async.series(a_array)
.then(()=>{
console.log('a done');
return async.series(b_array);
})
.then(()=>{
console.log('b done');
});
如果您不想按顺序运行每个进程,而是希望限制每个进程同时运行一定数量的进程,那么您可以使用
parallelLimit()
:
// Run two promises at a time:
async.parallelLimit(a_array,2)
.then(()=>console.log('done'));
阅读 async-q 文档:https://github.com/dbushong/async-q/blob/master/READJSME.md