我有 NodeJS 应用程序,我可以在其中连接 WS 服务器,监听新消息 - 每条消息我都会做 aysnc 工作。
问题: 这些作业需要时间 - 我想一次运行一个作业并按照它们收到的顺序运行。
例如: 消息1 -> 作业(消息) // 1000ms 消息2 -> 作业2(消息2) // 5000ms messagesgae3 -> (在 job2 完成之前收到,但仅在 job2 完成之后执行) -> job2(message3)
所以我创建了这个类来模拟 NodeJS 应用程序的行为,并且在这段代码中,作业仍然没有按顺序运行,而且看起来它不会等待前一个作业完成(不确定)
class AsyncQueue {
constructor() {
this.active = false
this.queue = []
}
async doJobs() {
if (this.active) {
return
}
this.active = true
while (this.queue.length > 0) {
const fn = this.queue.shift() // take first
await fn()
}
this.active = false
}
push(fn) {
this.queue.push(fn) // push to last
this.doJobs()
}
}
class Utils {
static randInt(min, max) {
return Math.random() * (max - min) + min
}
static sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
class WsServer {
static async onMessage(func) {
while (true) {
await Utils.sleep(1000)
func()
}
}
}
async function job(name) {
console.log(`Doing job ${name}`)
await Utils.sleep(Utils.randInt(1000, 5000)) // Fake job takes time
console.log(`Done doing job ${name}`)
}
async function main() {
const queue = new AsyncQueue()
let counter = 0
WsServer.onMessage(() => {
counter += 1;
queue.push(() => job(counter))
})
}
main()
功能已按顺序处理。柜台让你感到困惑。问题是闭包,如 Let 与 var in a for 循环
中所述该函数被推入队列并通过闭包访问计数器。按下该函数后,计数器会递增,并且函数会打印递增的计数器。您可以通过以下代码看到它:
class AsyncQueue {
constructor() {
this.active = false
this.queue = []
}
async doJobs() {
if (this.active) {
return
}
this.active = true
while (this.queue.length > 0) {
const [counter, fn] = this.queue.shift() // take first
console.log(counter);
await fn()
}
this.active = false
}
push(fn) {
this.queue.push(fn) // push to last
this.doJobs()
}
}
class Utils {
static randInt(min, max) {
return Math.random() * (max - min) + min
}
static sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms))
}
}
class WsServer {
static async onMessage(func) {
while (true) {
await Utils.sleep(1000)
func()
}
}
}
async function job(name) {
console.log(`Doing job ${name}`)
await Utils.sleep(Utils.randInt(1000, 5000)) // Fake job takes time
console.log(`Done doing job ${name}`)
}
async function main() {
const queue = new AsyncQueue()
let counter = 0
WsServer.onMessage(() => {
counter += 1;
queue.push([counter, ((counter) => () => job(counter))(counter)])
})
}
main()