我在nestjs后端使用bullmq实现了cron-job。 生产者代码:
import { Queue } from 'bullmq';
import { BullQueues, BullWorkers } from '../../../../../apps/api-server/src/shared/utils/constants';
import { CronJobDataSource } from '../data-source';
import { PlatformIntegration } from '../../../../../apps/api-server/src/modules/platform-integrations/entities/platform-integration.entity';
import { CalendarPlatforms } from '../../../../../apps/api-server/src/modules/platform-integrations/domain/calendar-platforms.enum';
/* eslint-disable @typescript-eslint/no-var-requires */
require('dotenv').config();
async function getUsersToSyncWithPlatform(platform: CalendarPlatforms) {
...
}
export const eventCronJob = async () => {
console.log('Event Cron Job Called');
try {
await CronJobDataSource.initialize();
// Initialize the BullMQ queue
const syncQueue = new Queue(BullQueues.SYNC_EVENTS, {
connection: { host: process.env.REDIS_HOSTNAME, port: Number(process.env.REDIS_PORT) },
});
// Function to enqueue jobs
const enqueueSyncJobs = async (
platform: CalendarPlatforms,
users: {
id: string;
account: string;
}[],
period: number,
) => {
for await (const user of users) {
await syncQueue.add(
BullWorkers.SYNC_EVENTS_FOR_PLATFORM,
{
platform,
userId: user.id,
account: user.account,
},
{ repeat: { every: period } },
);
}
};
// Enqueue jobs for Google Calendar
const usersToSyncGoogle = await getUsersToSyncWithPlatform(CalendarPlatforms.GOOGLE);
await enqueueSyncJobs(CalendarPlatforms.GOOGLE, usersToSyncGoogle, 20000);
process.exit();
} catch (error) {
console.error(error);
}
}
队列事件监听代码:
import { Process, Processor } from '@nestjs/bull';
import { Job } from 'bull';
@Processor(BullQueues.SYNC_EVENTS)
export class SyncEventsConsumer {
constructor() {}
@Process(BullWorkers.SYNC_EVENTS_FOR_PLATFORM)
async readOperationJob(job: Job<{ platform: CalendarPlatforms; userId: string; account: string }>) {
const {
data: { platform, userId, account },
} = job;
console.log('Cron Job: >>>>>>>>>>>', platform, userId, account);
}
BullModule 在
app.module.ts
中作为 Redis 连接,事件监听器服务在子 module.ts 中注册为提供者
但 cron-job 一次都不起作用。我需要你的帮助。谢谢。
生产者代码位于应用程序模块之外,并在 Nest-cli.json 中创建为库。 当我运行 Nest 后端时,我可以看到该登录生产者。
Event Cron Job Called
但我看不到登录队列事件侦听器。 Cron Job: >>>>>>>>>>> ...
我在生产者中使用了 bullmq
模块。我在事件列表器中使用了 @nestjs/bull
和 bull
模块。
我不确定为什么你需要一起使用 Bull 和 BullMQ,但是尝试使用 NestJS 和 BullMQ 设置可重复的作业,我最终遇到了你的问题。在应用程序模块中注册 BullMQ 连接并在单独的模块中注册队列后,我执行了以下操作:
import { InjectQueue, Processor, WorkerHost } from '@nestjs/bullmq';
import { Queue } from 'bullmq';
@Processor('example')
export class PhotoCleanupProcessor extends WorkerHost {
constructor(
@InjectQueue('example') private photoCleanupQueue: Queue,
private photoCleanupService: PhotoCleanupService,
) {
super();
this.setupRepeatableJob();
}
private setupRepeatableJob() {
console.log('Setting up repeatable job');
this.photoCleanupQueue.add('example-job', undefined, {
repeat: {
// run it every 10 seconds
pattern: '*/10 * * * * *',
},
});
}
async process(): Promise<void> {
console.log('Processing job');
}
}
这样,作业处理器及其处理程序。我可以运行服务的多个副本,并看到该作业每十秒仅由一个副本执行一次。我希望它有帮助。