BullMQ 中的延迟作业处理问题

问题描述 投票:0回答:1

我正在使用 BullMQ 将作业添加到队列中,设置为延迟 15 秒进行处理。所以,我正在做这样的事情:


// REDIS CONFIG
const redis = new Redis(<REDIS_CONFIG>, {
    maxRetriesPerRequest: null,
})

// QUEUE DEFINITION
const myQueue = new Queue(<QUEUE_NAME>, {
    defaultJobOptions: { removeOnComplete: true },
    connection: redis,
})

// QUEUE HANDLER
const queueHandler = async (job: Job) => {
    const { id } = job.data
    console.log(`Processing job ${id} after delay:`, new Date())
}

// WORKER DEFINITION
const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, { concurrency: 1, connection: redis })

// ON COMPLETE EVENT
sendWorker.on('completed', async (job: Job) => {
    // SOME CODE HERE
})

// ON FAIL EVENT
sendWorker.on('failed', async (job: any, error: Error) => {
    // SOME CODE HERE
})

在另一个文件中我正在做这样的事情:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
    console.log('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000}).catch(console.error)

我使用 “延迟” 选项在将作业添加到队列时设置延迟,如此处所述。然而,它并没有按预期工作。延迟仅适用于第一个作业,后续作业将连续处理,无延迟。控制台显示以下输出:

将作业添加到队列:{ id: '1' } 时间:2024-09-02T15:08:11.421Z

将作业添加到队列:{ id: '2' } 时间:2024-09-02T15:08:11.426Z

将作业添加到队列:{ id: '3' } 时间:2024-09-02T15:08:11.426Z

延迟后处理作业 1:2024-09-02T15:08:26.488Z // 注意:已经过去了 15 秒

延迟后处理作业 2:2024-09-02T15:08:27.129Z // 预期:处理前延迟 15 秒

延迟后处理作业 3:2024-09-02T15:08:27.425Z // 预期:处理前延迟 15 秒

到目前为止我尝试了什么:

1- 在队列默认作业选项上设置 “延迟” 选项:

const myQueue = new Queue(<QUEUE_NAME>, {
    defaultJobOptions: { removeOnComplete: true, delay: 15000 },
    connection: redis,
})

结果: 未观察到任何变化

2- 使用“foreach”而不是“forof”将作业添加到队列:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
some_objects.forEach((object) => {
    console.log('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(<JOB_NAME>, {id: object.id}, {delay: 15000})
}).catch(console.error)

结果: 未观察到任何变化

3- 逐一添加职位:

const some_objects = [{ id: '1' }, { id: '2' }, { id: '3' }]
myQueue.add('<JOB_NAME>', { id: some_objects[0].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[1].id }, { delay: 15000 })
myQueue.add('<JOB_NAME>', { id: some_objects[2].id }, { delay: 15000 })

结果: 未观察到任何变化

4- 添加到队列时使用不同的作业名称:

const some_objects = [{id: '1'}, {id: '2'}, {id: '3'}]
for (const object of some_objects):
    console.log('Adding job to queue:', object, ' at: ', new Date())
    myQueue.add(`object_${object.id}`, {id: object.id}, {delay: 15000}).catch(console.error)

结果: 未观察到任何变化

5-检查 Redis 上的队列:

KEYS *
redis-cli 命令显示此

  1. “牛::3”
  2. “牛::1”
  3. “牛::id”
  4. “牛::事件”
  5. “牛::元”
  6. “公牛::延迟”
  7. “牛::2”

接下来,我执行命令

ZRANGE bull:<queue_name>:delayed 0 -1
列出排序集
bull:<queue_name>:delayed
的所有成员,我得到:

  1. “1”
  2. “2”
  3. “3”

结果: 作业正确存储在 bull::delayed 集合中,直到指定的延迟时间过去,此时应将它们移至主队列进行处理。然而,延迟的作业并未按预期得到处理。

只有一名工作人员处于活动状态,并发数为 1(请参阅工作人员定义)。

node.js typescript delay fastify bullmq
1个回答
0
投票

所以,最近我在深入研究 BullMQ 文档时发现了 this。事实证明,我想要的功能无法通过延迟配置来实现,而是通过速率限制来实现。在我的工作程序中,我只是添加了限制器配置,并且实现了我正在寻找的行为:处理作业之间有 15 秒的延迟。

const sendWorker = new Worker(<QUEUE_NAME>, queueHandler, {
    concurrency: 1,
    connection: redis,
    limiter: {
        max: 1,
        duration: 15000,
    },
})

据我现在的理解,

delay
选项将所有有延迟的作业添加到一个集合中,一旦延迟时间过去,工作人员就会连续处理整个集合。但是,使用
limiter
选项,工作人员会在处理每个作业之间等待定义的持续时间。

我希望这个答案对某人有所帮助!如果是的话,请给我点赞。 😁

最新问题
© www.soinside.com 2019 - 2025. All rights reserved.