使用 bull 进行节点 cron 作业调度

问题描述 投票:0回答:5

在我的 Node-Express 应用程序中,我需要根据用户从 UI 上可用的日历中选择的开始时间-日期和结束时间-日期来安排作业(与 LinkedIn 个人资料连接)。该作业计划在选定的几批数据上执行,直到满足日期时间要求。

我正在使用 npm 包 Bull 来处理作业队列和调度,但它似乎不起作用。一旦设置计时器或从 UI 创建新作业,作业(用于连接 LinkedIn 个人资料)或简单的控制台(用于测试目的)就会立即执行,而不是在定义的开始时间执行。

这是我的代码:

const Queue = require("bull");
const mongoose = require("mongoose");
const schedulerJobs = require("../api/models/schedulerJobs");

var id = mongoose.Types.ObjectId();
var HTMLParser = require("node-html-parser");
var { fetchSendConnectionData } = require("./services");
var { setLeadName } = require("./utils");

module.exports = async function connectionScheduler(
  userId,
  cookieName,
  leads,
  start,
  end,
  campaign_name,
  connection_message,
  campaign_id
) {
  var i = 0;
  var parse_message = HTMLParser.parse(
    connection_message
  ).structuredText.toString();

  // 1. Initiating the Queue
  const campaignQueue = new Queue("Campaign", {
    redis: {
      host: "127.0.0.1",
      port: 6379,
    },
  });

  const data = leads;

  const options = {
    attempts: 2,
    delay: 5000,
    repeat: {
      cron: "*/2 * * * * *",
      tz: "America/Los_Angeles",
      startDate: start,
      endDate: end,
    },
  };

  // 2. Adding a Job to the Queue
  campaignQueue.add(`${campaign_name}`, data, options);

  // 3. Consumer
  campaignQueue.process(async (job) => {
    campaignQueue.close();

    return await console.log("CRON started.....");

    console.log(
      `Connection Cron For ${campaign_name} Now Points To Index => ${i}`
    );
    fetchSendConnectionData({
      name: cookieName,
      profile_links: job[i].profileUrl,
      message: setLeadName(parse_message, job[i].name),
    })
      .then(({ data: { message } }) => {
        if (message === "Connection send") {
          console.log(
            `Its Been 5 Minutes And Connection Request Is Sent To => ${job[i].name}`
          );
        } else {
          console.log(
            `Connection Request To => ${job[i].name} Has Failed So Now We Move On To The Next One`
          );
        }
        i = i + 1;
        if (i === job.length) {
          job.close();
        }
      })
      .catch((err) => {
        console.log(
          `Connection Request To => ${job[i].name} Has Failed Due To => ${err.message}`
        );
        i = i + 1;
        if (i === job.length) {
          job.close();
        }
      });
  });

  // 4. Listener
  campaignQueue.on("error", (err) => {
    // console.log(`Job completed with result ${result}`);
  });

  campaignQueue.on("progress", function (job, progress) {
    // A job's progress was updated!
  });

  campaignQueue.on("completed", (job, result) => {
    // console.log("job completed", job);

    //save job completed to database
    const jobdetail = {
      userId,
      start,
      end,
      campaign_id,
      campaign_name,
      connection_message,
      jobId: job.opts.jobId,
    };

    const schedulerjobs = new schedulerJobs(jobdetail);
    schedulerjobs.save().then((scheduledjob) => console.log("Job saved to db"));
  });

  campaignQueue.on("failed", function (job, err) {
    // A job failed with reason `err`!
  });
};

服务器一启动,就会立即打印输出:

CRON started.....
Job saved to db

它不会等待

start
,只运行一次,不会一直运行到
end

请帮忙解决这个问题

node.js mongoose cron schedule
5个回答
3
投票

要每 5 分钟重复一次作业,您应该使用以下代码:

const myJob = await myqueue.add(
  { foo: 'bar' },
  {
    repeat: {
      every: 5*60*1000,
    }
  }
);

1
投票

可以使用bull的重复功能

queue.add({your_data:""},{repeat:{cron:"5 * * * *"}});

上面的代码将每 5 分钟运行一次作业。

https://optimalbits.github.io/bull/

请参阅上述文档的可重复部分。


1
投票

代码上的错误已修复,请检查以下解决方案:

const Queue = require('bull');

module.exports = {

jobStatusCheckScheduler: async function () {
    console.log('hi')
    // 1. Initiating the Queue
    const statusCheckQueue = new Queue("JobStatusCheck", {
        redis: {
            host: "127.0.0.1",
            port: 6379,
        },
    });

    const options = {
        removeOnFail: true,
        attempts: 3,
        repeat: {
            every: 10000,
            limit: 5,
        },
    };

    // 2. Adding function in the job
    statusCheckQueue.process(async (job, callback) => {
        try {
            console.log('Processing job', job.id, job.data);
            callback();

        } catch (err) {
            console.log(err);
        }
    }).then(() => {
        console.log('suresh')
    }).catch((err) => {
        console.log(err)
    })

    // 3. Adding a Job to the Queue
    await statusCheckQueue.add( {user: '1'}, options).then((job) => {
        console.log('suresh first', job.id)
    }).catch((err) => {
        console.log(err)
    })

    // 4. Listener
    statusCheckQueue.on("error", (err) => {
        console.log(`Job error ${err}`);
    });

    statusCheckQueue.on("progress", function (job, progress) {
        // A job's progress was updated!
    });

    statusCheckQueue.on("completed", (job, result) => {
        console.log("Job completed", job.data);
    });

    statusCheckQueue.on("failed", function (job, err) {
        // A job failed with reason `err`!
        console.log(`Job not completed failed ${err}`);
    });
}

};


0
投票

您需要删除队列中或特定名称队列中的所有作业, 和我在一起:

async function Remove() {
    await logRankingQueue.obliterate();
     console.log("Clear done")
 }
 Remove()

-4
投票

return之后的任何代码行都将被忽略。 试试这个:

    // return await console.log("CRON started.....");
    await console.log("CRON started.....");

如果需要,在函数末尾放置一个空的return

© www.soinside.com 2019 - 2024. All rights reserved.