js 中的异步有序队列执行器

问题描述 投票:0回答:1

我有 NodeJS 应用程序,我可以在其中连接 WS 服务器,监听新消息 - 每条消息我都会做 aysnc 工作。

问题: 这些作业需要时间 - 我想一次运行一个作业并按照它们收到的顺序运行。

例如: 消息1 -> 作业(消息) // 1000ms 消息2 -> 作业2(消息2) // 5000ms messagesgae3 -> (在 job2 完成之前收到,但仅在 job2 完成之后执行) -> job2(message3)

所以我创建了这个类来模拟 NodeJS 应用程序的行为,并且在这段代码中,作业仍然没有按顺序运行,而且看起来它不会等待前一个作业完成(不确定)

class AsyncQueue {
    constructor() {
        this.active = false
        this.queue = []
    }

    async doJobs() {
        if (this.active) {
            return
        }

        this.active = true
        while (this.queue.length > 0) {
            const fn = this.queue.shift() // take first
            await fn()
        }
        this.active = false
    }

    push(fn) {
        this.queue.push(fn) // push to last
        this.doJobs()
    }
}

class Utils {
    static randInt(min, max) {
        return Math.random() * (max - min) + min
    } 
    
    static sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms))
    }
}


class WsServer {
    static async onMessage(func) {
        while (true) {
            await Utils.sleep(1000)
            func()
        }
    }
}


async function job(name) {
    console.log(`Doing job ${name}`)
    await Utils.sleep(Utils.randInt(1000, 5000)) // Fake job takes time
    console.log(`Done doing job ${name}`)
}

async function main() {
    const queue = new AsyncQueue()
    let counter = 0
    WsServer.onMessage(() => {
        counter += 1;
        queue.push(() => job(counter))
    })
}

main()
javascript asynchronous async-await queue
1个回答
0
投票

功能已按顺序处理。柜台让你感到困惑。问题是闭包,如 Let 与 var in a for 循环

中所述

该函数被推入队列并通过闭包访问计数器。按下该函数后,计数器会递增,并且函数会打印递增的计数器。您可以通过以下代码看到它:

class AsyncQueue {
    constructor() {
        this.active = false
        this.queue = []
    }

    async doJobs() {
        if (this.active) {
            return
        }

        this.active = true
        while (this.queue.length > 0) {
            const [counter, fn] = this.queue.shift() // take first
            console.log(counter);
            await fn()
        }
        this.active = false
    }

    push(fn) {
        this.queue.push(fn) // push to last
        this.doJobs()
    }
}

class Utils {
    static randInt(min, max) {
        return Math.random() * (max - min) + min
    } 
    
    static sleep(ms) {
        return new Promise(resolve => setTimeout(resolve, ms))
    }
}


class WsServer {
    static async onMessage(func) {
        while (true) {
            await Utils.sleep(1000)
            func()
        }
    }
}


async function job(name) {
    console.log(`Doing job ${name}`)
    await Utils.sleep(Utils.randInt(1000, 5000)) // Fake job takes time
    console.log(`Done doing job ${name}`)
}

async function main() {
    const queue = new AsyncQueue()
    let counter = 0
    WsServer.onMessage(() => {
        counter += 1;
        queue.push([counter, ((counter) => () => job(counter))(counter)])
    })
}

main()
© www.soinside.com 2019 - 2024. All rights reserved.