假设我有一个代表任务的模型:
class Task(models.Model):
on_status = models.BooleanField()
def run(self):
if self.on_status:
# do stuff
我使用 Celery Beat 运行此任务,并且在 Gunicorn 上运行一个仪表板,两者都使用相同的数据库。
app = Celery("app")
app.conf.beat_schedule = {
"run_task": {
"task": "run_tasks",
"schedule": 5.0,
"options": {
"expires": 6.0,
},
},
}
tasks = Task.objects.all()
@app.task
def run_tasks():
for task in tasks:
task.run()
我可以从仪表板更改
on_status
,但随后我需要更新 Celery Beat 进程内实例的 self.on_status
。是否有从数据库更新属性值的命令或者是否有不同的方法?
无论你如何使用 celery-beat,核心问题似乎是如何刷新 django 模型对象。这是通过
refresh_from_db
完成的:https://docs.djangoproject.com/en/5.1/ref/models/instances/#refreshing-objects-from-database