我正在使用 BullMQ 将作业添加到队列中,设置为延迟 15 秒进行处理。所以,我正在做这样的事情:
// REDIS CONFIG
const redis = new Redis(<REDIS_CONFIG>, {
maxRetriesPerRequest: null,
})
// QUEUE DEFINITION
const myQueue = new Queue(<QUEUE_NAME>, {
defaultJobOptions: { removeOnComplete: true },
connection: redis,
})
// QUEUE HANDLER
const queueHandler = async (job: Job) => {
const { id } = job.data
console.log(`Processing job ${id} after delay:`, new Date())
}
// WORKER DEFINITION
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, { concurrency: 1, connection: redis })
// ON COMPLETE EVENT
sendWorker.on('completed', async (job: Job) => {
// SOME CODE HERE
})
// ON FAIL EVENT
sendWorker.on('failed', async (job: any, error: Error) => {
// SOME CODE HERE
})
在另一个文件中我正在做这样的事情:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
console.log('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000}).catch(console.error)
我使用 “延迟” 选项在将作业添加到队列时设置延迟,如此处所述。然而,它并没有按预期工作。延迟仅适用于第一个作业,后续作业将连续处理,无延迟。控制台显示以下输出:
将作业添加到队列:{ id: '1' } 时间:2024-09-02T15:08:11.421Z
将作业添加到队列:{ id: '2' } 时间:2024-09-02T15:08:11.426Z
将作业添加到队列:{ id: '3' } 时间:2024-09-02T15:08:11.426Z
延迟后处理作业 1:2024-09-02T15:08:26.488Z // 注意:已经过去了 15 秒
延迟后处理作业 2:2024-09-02T15:08:27.129Z // 预期:处理前延迟 15 秒
延迟后处理作业 3:2024-09-02T15:08:27.425Z // 预期:处理前延迟 15 秒
到目前为止我尝试了什么:
1- 在队列默认作业选项上设置 “延迟” 选项:
const myQueue = new Queue(<QUEUE_NAME>, {
defaultJobOptions: { removeOnComplete: true, delay: 15000 },
connection: redis,
})
结果: 未观察到任何变化
2- 使用“foreach”而不是“forof”将作业添加到队列:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
some_objects.forEach((object) => {
console.log('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000})
}).catch(console.error)
结果: 未观察到任何变化
3- 逐一添加职位:
const some_objects = [{ id: '1' }, { id: '2' }, { id: '3' }]
myQueue.add('<JOB_NAME>', { id: some_objects[0].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[1].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[2].id }, { delay: 15000 })
结果: 未观察到任何变化
4- 添加到队列时使用不同的作业名称:
const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
console.log('Adding job to queue:', object, ' at: ', new Date())
myQueue.add(`object_${object.id}`, {id: object.id}, {delay: 15000}).catch(console.error)
结果: 未观察到任何变化
5-检查 Redis 上的队列:
KEYS *
redis-cli 命令显示此
- “牛:
:3” - “牛:
:1” - “牛:
:id” - “牛:
:事件” - “牛:
:元” - “公牛:
:延迟” - “牛:
:2”
接下来,我执行命令
ZRANGE bull:<queue_name>:delayed 0 -1
列出排序集 bull:<queue_name>:delayed
的所有成员,我得到:
- “1”
- “2”
- “3”
结果: 作业正确存储在 bull:
只有一名工作人员处于活动状态,并发数为 1(请参阅工作人员定义)。
所以,最近我在深入研究 BullMQ 文档时发现了 this。事实证明,我想要的功能无法通过延迟配置来实现,而是通过速率限制来实现。在我的工作程序中,我只是添加了限制器配置,并且实现了我正在寻找的行为:处理作业之间有 15 秒的延迟。
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, {
concurrency: 1,
connection: redis,
limiter: {
max: 1,
duration: 15000,
},
})
据我现在的理解,
delay
选项将所有有延迟的作业添加到一个集合中,一旦延迟时间过去,工作人员就会连续处理整个集合。但是,使用 limiter
选项,工作人员会在处理每个作业之间等待定义的持续时间。
我希望这个答案对某人有所帮助!如果是的话,请给我点赞。 😁