我正在尝试使用APScheduler测试错误执行的任务,但是重新启动APScheduler时看不到丢失的任务在运行。我已经将APScheduler配置如下:
scheduler.py
def configure_scheduler():
jobstores = {
'default': SQLAlchemyJobStore(url=config('DATABASE_URL'))
}
sched = BlockingScheduler()
sched.configure(jobstores=jobstores)
sched.add_job(
test_task,
id='test_task',
'interval',
hours=1,
coalesce=True,
max_instances=1,
misfire_grace_time=360,
replace_existing=True
)
return sched
if __name__ == '__main__':
scheduler = configure_scheduler()
scheduler.start()
[第一次启动调度程序时,test_task
被添加到Postgres数据库中的apscheduler_jobs
表中,并且从启动调度程序起的next_run_time
为一小时。然后,我尝试通过以下方法测试失火:
next_run_time
更改为当前时间当我按照此步骤进行操作时,next_run_time
再次设置为距当前时间的一小时。 next_run_time
似乎在SQLAlchemy作业存储的update_job
方法中进行了更新。我已经看到一个与持久性作业存储任务相关的update_job
在断火后未运行。我见过的大多数similar question other的解决方案是将questions参数添加到misfire_grace_time
。我已经按照上面的配置尝试过此操作,但是在调度程序启动时没有运气错过任何作业。我是否缺少与add_job
和replace_existing
参数交互方式有关的内容?我是否需要手动检查是否有任何作业的misfire_grace_time
过去,然后在启动调度程序之前运行这些作业?
我正在使用APScheduler库的v3.6.1。
[有关其他情况,我将在Heroku上部署调度程序,并且尝试解决每天至少发生一次的Heroku的next_run_time
。
[与AlexSchöler的创建者AlexGrönholm在APScheduler Gitter聊天室中的automatic dyno cycling之后,我能够确定该作业正在数据库中被“覆盖”,因为我对some discussion的呼叫中有replace_existing=True
]。这将导致调度程序每次启动时都替换作业存储中的作业。
我的解决方法
调度程序启动前,请检查add_job
表中的现有作业。
对于数据库中的每个作业,请对照当前时间检查apscheduler_jobs
。如果next_run_time
是过去的,请立即运行作业。
使用next_run_time
像以前一样安排作业。
启动调度程序。