在我的 Node-Express 应用程序中,我需要根据用户从 UI 上可用的日历中选择的开始时间-日期和结束时间-日期来安排作业(与 LinkedIn 个人资料连接)。该作业计划在选定的几批数据上执行,直到满足日期时间要求。
我正在使用 npm 包 Bull 来处理作业队列和调度,但它似乎不起作用。一旦设置计时器或从 UI 创建新作业,作业(用于连接 LinkedIn 个人资料)或简单的控制台(用于测试目的)就会立即执行,而不是在定义的开始时间执行。
这是我的代码:
const Queue = require("bull");
const mongoose = require("mongoose");
const schedulerJobs = require("../api/models/schedulerJobs");
var id = mongoose.Types.ObjectId();
var HTMLParser = require("node-html-parser");
var { fetchSendConnectionData } = require("./services");
var { setLeadName } = require("./utils");
module.exports = async function connectionScheduler(
userId,
cookieName,
leads,
start,
end,
campaign_name,
connection_message,
campaign_id
) {
var i = 0;
var parse_message = HTMLParser.parse(
connection_message
).structuredText.toString();
// 1. Initiating the Queue
const campaignQueue = new Queue("Campaign", {
redis: {
host: "127.0.0.1",
port: 6379,
},
});
const data = leads;
const options = {
attempts: 2,
delay: 5000,
repeat: {
cron: "*/2 * * * * *",
tz: "America/Los_Angeles",
startDate: start,
endDate: end,
},
};
// 2. Adding a Job to the Queue
campaignQueue.add(`${campaign_name}`, data, options);
// 3. Consumer
campaignQueue.process(async (job) => {
campaignQueue.close();
return await console.log("CRON started.....");
console.log(
`Connection Cron For ${campaign_name} Now Points To Index => ${i}`
);
fetchSendConnectionData({
name: cookieName,
profile_links: job[i].profileUrl,
message: setLeadName(parse_message, job[i].name),
})
.then(({ data: { message } }) => {
if (message === "Connection send") {
console.log(
`Its Been 5 Minutes And Connection Request Is Sent To => ${job[i].name}`
);
} else {
console.log(
`Connection Request To => ${job[i].name} Has Failed So Now We Move On To The Next One`
);
}
i = i + 1;
if (i === job.length) {
job.close();
}
})
.catch((err) => {
console.log(
`Connection Request To => ${job[i].name} Has Failed Due To => ${err.message}`
);
i = i + 1;
if (i === job.length) {
job.close();
}
});
});
// 4. Listener
campaignQueue.on("error", (err) => {
// console.log(`Job completed with result ${result}`);
});
campaignQueue.on("progress", function (job, progress) {
// A job's progress was updated!
});
campaignQueue.on("completed", (job, result) => {
// console.log("job completed", job);
//save job completed to database
const jobdetail = {
userId,
start,
end,
campaign_id,
campaign_name,
connection_message,
jobId: job.opts.jobId,
};
const schedulerjobs = new schedulerJobs(jobdetail);
schedulerjobs.save().then((scheduledjob) => console.log("Job saved to db"));
});
campaignQueue.on("failed", function (job, err) {
// A job failed with reason `err`!
});
};
服务器一启动,就会立即打印输出:
CRON started.....
Job saved to db
它不会等待
start
,只运行一次,不会一直运行到end
。
请帮忙解决这个问题
要每 5 分钟重复一次作业,您应该使用以下代码:
const myJob = await myqueue.add(
{ foo: 'bar' },
{
repeat: {
every: 5*60*1000,
}
}
);
可以使用bull的重复功能
queue.add({your_data:""},{repeat:{cron:"5 * * * *"}});
上面的代码将每 5 分钟运行一次作业。
https://optimalbits.github.io/bull/
请参阅上述文档的可重复部分。
代码上的错误已修复,请检查以下解决方案:
const Queue = require('bull');
module.exports = {
jobStatusCheckScheduler: async function () {
console.log('hi')
// 1. Initiating the Queue
const statusCheckQueue = new Queue("JobStatusCheck", {
redis: {
host: "127.0.0.1",
port: 6379,
},
});
const options = {
removeOnFail: true,
attempts: 3,
repeat: {
every: 10000,
limit: 5,
},
};
// 2. Adding function in the job
statusCheckQueue.process(async (job, callback) => {
try {
console.log('Processing job', job.id, job.data);
callback();
} catch (err) {
console.log(err);
}
}).then(() => {
console.log('suresh')
}).catch((err) => {
console.log(err)
})
// 3. Adding a Job to the Queue
await statusCheckQueue.add( {user: '1'}, options).then((job) => {
console.log('suresh first', job.id)
}).catch((err) => {
console.log(err)
})
// 4. Listener
statusCheckQueue.on("error", (err) => {
console.log(`Job error ${err}`);
});
statusCheckQueue.on("progress", function (job, progress) {
// A job's progress was updated!
});
statusCheckQueue.on("completed", (job, result) => {
console.log("Job completed", job.data);
});
statusCheckQueue.on("failed", function (job, err) {
// A job failed with reason `err`!
console.log(`Job not completed failed ${err}`);
});
}
};
您需要删除队列中或特定名称队列中的所有作业, 和我在一起:
async function Remove() {
await logRankingQueue.obliterate();
console.log("Clear done")
}
Remove()
return之后的任何代码行都将被忽略。 试试这个:
// return await console.log("CRON started.....");
await console.log("CRON started.....");
如果需要,在函数末尾放置一个空的return。