我是 BullMQ 的新手,我一直在尝试实现“流”功能,但是我在
syncVotesTableAndExtractDataFlow
类型的变量中列出的引用作业未运行(此代码位于 FlowProducer
类中) .我在控制器类中调用方法 FilterVoteDataProducer
,然后在我的
syncVotesTableAndExtractData
类中调用 syncVotesTableAndExtractData
(同名)。我注意到只有 FilterVoteDataProducer
属性会在两个作业中运行,但顺序不正确,首先运行父级的数据,然后运行子级,根据文档,应该是相反的。
在我的 2 个消费者类(data
和
FilterVoteDataConsumer
)中,我在 SyncVoteTableConsumer
方法中创建了一个 switch case,该方法从 process
类扩展,专门等待分别属于该队列的作业名称,但它们是从来没有跑过。我的问题是如何正确设置流程功能以便可以运行上述开关案例中的代码?目前,如果该作业不会从其所属队列的 process 方法调用,则在将作业添加到流程功能时,我不认为列出queueName 属性有什么意义。
这是我运行程序时的输出:
WorkerHost
<- first the parent is executedParent job executes LAST
这是我的代码:<- then the child is executed
Child job executes FIRST
班级
FilterVoteDataProducer
import { InjectFlowProducer } from '@nestjs/bullmq';
import { FlowProducer } from 'bullmq';
import { Injectable } from '@nestjs/common';
import {
SYNC_FILTER_VOTES_DATA_QUEUE,
SYNC_VOTES_TABLE_QUEUE,
FILTER_VOTES_DATA_JOB,
SYNC_VOTES_TABLE_AND_EXTRACT_DATA_FLOW,
SYNC_VOTES_TABLE_JOB,
} from '../common/constants.js';
@Injectable()
export class FilterVoteDataProducer {
constructor(
@InjectFlowProducer(SYNC_VOTES_TABLE_AND_EXTRACT_DATA_FLOW)
private readonly syncVotesTableAndExtractDataFlow: FlowProducer,
) {}
async syncVotesTableAndExtractData() {
return await this.syncVotesTableAndExtractDataFlow.add({
name: SYNC_VOTES_TABLE_JOB,
queueName: SYNC_VOTES_TABLE_QUEUE,
data: console.log(`Parent job executes LAST`),
children: [
{
name: FILTER_VOTES_DATA_JOB,
queueName: SYNC_FILTER_VOTES_DATA_QUEUE,
data: console.log('Child job executes FIRST'),
},
],
});
}
}
班级
SyncVoteTableConsumer
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import {
SYNC_VOTES_TABLE_QUEUE,
SYNC_VOTES_TABLE_JOB,
} from '../common/constants.js';
@Processor(SYNC_VOTES_TABLE_QUEUE, { lockDuration: 600000 })
export class SyncVoteTableConsumer extends WorkerHost {
constructor() {
super();
}
private readonly logger = new Logger(SyncVoteTableConsumer.name);
async process(job: Job<any>, token: string | undefined): Promise<void> {
switch (job.name) {
case SYNC_VOTES_TABLE_JOB:
{
this.logger.verbose(
`Attempting to sync votes table, job id: ${job.id}`,
);
const result = await this.syncVotesTable();
return result;
}
break;
default:
throw new Error(`Process ${job.name} not implemented`);
}
}
private async syncVotesTable() {
console.log(`Table is synced`);
}
}
班级
FilterVoteDataConsumer
import { Processor, WorkerHost } from '@nestjs/bullmq';
import { Logger } from '@nestjs/common';
import { Job } from 'bullmq';
import {
SYNC_FILTER_VOTES_DATA_QUEUE,
FILTER_VOTES_DATA_JOB,
} from '../common/constants.js';
@Processor(SYNC_FILTER_VOTES_DATA_QUEUE, { lockDuration: 600000 })
export class FilterVoteDataConsumer extends WorkerHost {
constructor() {
super();
}
private readonly logger = new Logger(FilterVoteDataConsumer.name);
async process(job: Job<any>, token: string | undefined): Promise<void> {
switch (job.name) {
case FILTER_VOTES_DATA_JOB:
{
this.logger.verbose(`Getting specific votes data ${job.id}`);
this.getSpecVoteData();
}
break;
default:
throw new Error(`Process ${job.name} not implemented`);
}
}
private async getSpecVoteData() {
return this.logger.debug(`Votes data obtained`);
}
}
班级
VoteController
import { Controller, Get, Post } from '@nestjs/common';
import { Cron, CronExpression } from '@nestjs/schedule';
import { FilterVoteDataProducer } from '../filterVoteData.producer.js';
@Controller('vote-controller')
export class VoteController {
constructor(private readonly producer: FilterVoteDataProducer) {}
@Cron(CronExpression.EVERY_5_SECONDS)
@Post('vote')
async syncVotesTable() {
await this.producer.syncVotesTableAndExtractData();
}
}
分配给第一个父作业数据字段,然后分配给子作业数据字段时,会发生这种情况。
您的作业似乎尚未添加或尚未运行。确保将您的队列、流生产者和处理器注册为提供者,请参阅官方
文档中的详细信息 如果使用默认键前缀,您可以通过运行
console.log()
查询在
redis-cli
中查找作业来检查作业是否已添加到队列中。