如果我有一个定义如下的函数:
def add(x,y):
return x+y
有没有办法动态添加这个函数作为芹菜PeriodicTask并在运行时启动它?我希望能够做类似(伪代码)的事情:
some_unique_task_id = celery.beat.schedule_task(add, run_every=crontab(minute="*/30"))
celery.beat.start(some_unique_task_id)
我还想用类似(伪代码)的东西动态地停止或删除该任务:
celery.beat.remove_task(some_unique_task_id)
要么
celery.beat.stop(some_unique_task_id)
仅供参考我没有使用djcelery,它允许您通过django管理员管理定期任务。
不,我很抱歉,这对于常规的芹菜不可能。
但它很容易扩展到你想要的,例如django-celery调度程序只是一个子类,用于读取和写入数据库的计划(顶部有一些优化)。
即使对于非Django项目,您也可以使用django-celery调度程序。
像这样的东西:
DATABASES = {
'default': {
'NAME': 'celerybeat.db',
'ENGINE': 'django.db.backends.sqlite3',
},
}
INSTALLED_APPS = ('djcelery', )
$ PYTHONPATH=. django-admin.py syncdb --settings=celeryconfig
$ PYTHONPATH=. django-admin.py celerybeat --settings=celeryconfig \
-S djcelery.schedulers.DatabaseScheduler
还有djcelerymon
命令可用于非Django项目在同一过程中启动celerycam和Django Admin网络服务器,您可以使用它来编辑一个漂亮的Web界面中的周期性任务:
$ djcelerymon
(注意由于某种原因djcelerymon无法使用Ctrl + C停止,你必须使用Ctrl + Z + kill%1)
google groups回答了这个问题。
我不是作者,所有功劳归功于Jean Mark
这是一个适当的解决方案。确认工作,在我的场景中,我分类了Periodic Task并创建了一个模型,因为我可以根据需要将其他字段添加到模型中,因此我可以添加“terminate”方法。您必须将定期任务的enabled属性设置为False并在删除之前保存它。整个子类不是必须的,schedule_every方法是真正完成工作的方法。当您准备终止任务时(如果您没有将其子类化),您可以使用PeriodicTask.objects.filter(name = ...)来搜索您的任务,禁用它,然后将其删除。
希望这可以帮助!
from djcelery.models import PeriodicTask, IntervalSchedule from datetime import datetime class TaskScheduler(models.Model): periodic_task = models.ForeignKey(PeriodicTask) @staticmethod def schedule_every(task_name, period, every, args=None, kwargs=None): """ schedules a task by name every "every" "period". So an example call would be: TaskScheduler('mycustomtask', 'seconds', 30, [1,2,3]) that would schedule your custom task to run every 30 seconds with the arguments 1,2 and 3 passed to the actual task. """ permissible_periods = ['days', 'hours', 'minutes', 'seconds'] if period not in permissible_periods: raise Exception('Invalid period specified') # create the periodic task and the interval ptask_name = "%s_%s" % (task_name, datetime.datetime.now()) # create some name for the period task interval_schedules = IntervalSchedule.objects.filter(period=period, every=every) if interval_schedules: # just check if interval schedules exist like that already and reuse em interval_schedule = interval_schedules[0] else: # create a brand new interval schedule interval_schedule = IntervalSchedule() interval_schedule.every = every # should check to make sure this is a positive int interval_schedule.period = period interval_schedule.save() ptask = PeriodicTask(name=ptask_name, task=task_name, interval=interval_schedule) if args: ptask.args = args if kwargs: ptask.kwargs = kwargs ptask.save() return TaskScheduler.objects.create(periodic_task=ptask) def stop(self): """pauses the task""" ptask = self.periodic_task ptask.enabled = False ptask.save() def start(self): """starts the task""" ptask = self.periodic_task ptask.enabled = True ptask.save() def terminate(self): self.stop() ptask = self.periodic_task self.delete() ptask.delete()
有一个名为django-celery-beat的库,它提供了所需的模型。要使其动态加载新的定期任务,必须创建自己的调度程序。
from django_celery_beat.schedulers import DatabaseScheduler
class AutoUpdateScheduler(DatabaseScheduler):
def tick(self, *args, **kwargs):
if self.schedule_changed():
print('resetting heap')
self.sync()
self._heap = None
new_schedule = self.all_as_schedule()
if new_schedule:
to_add = new_schedule.keys() - self.schedule.keys()
to_remove = self.schedule.keys() - new_schedule.keys()
for key in to_add:
self.schedule[key] = new_schedule[key]
for key in to_remove:
del self.schedule[key]
super(AutoUpdateScheduler, self).tick(*args, **kwargs)
@property
def schedule(self):
if not self._initial_read and not self._schedule:
self._initial_read = True
self._schedule = self.all_as_schedule()
return self._schedule
你可以查看这个配置flask和djcelery的flask-djcelery,还提供可浏览的rest api
最终通过包含在芹菜v4.1.0中的a fix实现了这一点。现在,您只需要更改数据库后端中的计划条目,celery-beat将根据新计划执行操作。
文档vaguely describe这是如何工作的。 celery-beat的默认调度程序PersistentScheduler
使用shelve file作为其计划数据库。 beat_schedule
实例中对PersistentScheduler
字典的任何更改都与此数据库同步(默认情况下,每3分钟一次),反之亦然。文档使用how to add new entries将beat_schedule
描述为app.add_periodic_task
。要修改现有条目,只需添加具有相同name
的新条目。像在字典中一样删除条目:del app.conf.beat_schedule['name']
。
假设您想使用外部应用程序监控和修改您的芹菜节拍时间表。那你有几个选择:
open
搁置数据库文件并像字典一样读取其内容。回写此文件以进行修改。