我想使用 celery 配置 pi 创建以下流程:
可能吗?怎么办?
您可以通过某种记忆方式让您的工作了解其他任务。如果您使用缓存控制键(redis,memcached,/tmp,任何方便的),您可以使执行依赖于该键。我以 redis 为例。
from redis import Redis
@app.task
def run_only_one_instance(params):
try:
sentinel = Redis().incr("run_only_one_instance_sentinel")
if sentinel == 1:
#I am the legitimate running task
perform_task()
else:
#Do you want to do something else on task duplicate?
pass
Redis().decr("run_only_one_instance_sentinel")
except Exception as e:
Redis().decr("run_only_one_instance_sentinel")
# potentially log error with Sentry?
# decrement the counter to insure tasks can run
# or: raise e
我的答案是假设
bind=True
传递给任务,以便我们可以获取任务 uuid 为 self.request.id
from redis import Redis
@app.task(bind=True)
def your_task(self, *args, **kwargs):
task_id = self.request.id
# check if already executed
redis_conn = Redis.from_url(redis_url, charset="utf-8", decode_responses=True)
if redis_conn.get(task_id):
logger.info(f"{task_id} is a duplicate job")
raise Exception('Duplicate job') # Handle this accordingly
# expire in 1 day
ex_1_day = 24*60*60
redis_conn.set(task_id, 1, ex=ex_1_day)
# From here run your job now
# ...
这样,它将确保您的作业永远不会并行运行,也不会重复顺序运行
我不知道它会比其他答案对你有更多帮助,但我的方法是遵循 srj 给出的相同想法。我需要一种方法来阻止我的服务器启动具有相同 id 的任务来排队。所以我做了一个通用函数来帮助我。
def is_task_active_or_registered(app, task_id):
i = app.control.inspect()
active_dict = i.active()
scheduled_dict = i.scheduled()
keys_set = set(active_dict.keys() + scheduled_dict.keys())
tasks_ids_set = set()
for _dict in [active_dict, scheduled_dict]:
for k in keys_set:
for task in _dict[k]:
tasks_ids_set.add(task['id'])
if task_id in tasks_ids_set:
return True
else:
return False
所以,我这样使用它:
在我的 celery-app 对象可用的上下文中,我定义:
def check_task_can_not_run(task_id):
return is_task_active_or_registered(app=celery, task_id=task_id)
因此,根据我的客户请求,我将此称为
check_task_can_not_run(...)
并阻止任务在 True
的情况下启动。
我也面临着类似的问题。 The Beat 在我的队列中重复播放。我想使用
expires
但此功能无法正常工作 https://github.com/celery/celery/issues/4300.
这里是调度程序,它检查任务是否已经排队(基于任务名称)。
# -*- coding: UTF-8 -*-
from __future__ import unicode_literals
import json
from heapq import heappop, heappush
from celery.beat import event_t
from celery.schedules import schedstate
from django_celery_beat.schedulers import DatabaseScheduler
from typing import List, Optional
from typing import TYPE_CHECKING
from your_project import celery_app
if TYPE_CHECKING:
from celery.beat import ScheduleEntry
def is_task_in_queue(task, queue_name=None):
# type: (str, Optional[str]) -> bool
queues = [queue_name] if queue_name else celery_app.amqp.queues.keys()
for queue in queues:
if task in get_celery_queue_tasks(queue):
return True
return False
def get_celery_queue_tasks(queue_name):
# type: (str) -> List[str]
with celery_app.pool.acquire(block=True) as conn:
tasks = conn.default_channel.client.lrange(queue_name, 0, -1)
decoded_tasks = []
for task in tasks:
j = json.loads(task)
task = j['headers']['task']
if task not in decoded_tasks:
decoded_tasks.append(task)
return decoded_tasks
class SmartScheduler(DatabaseScheduler):
"""
Smart means that prevents duplicating of tasks in queues.
"""
def is_due(self, entry):
# type: (ScheduleEntry) -> schedstate
is_due, next_time_to_run = entry.is_due()
if (
not is_due or # duplicate wouldn't be created
not is_task_in_queue(entry.task) # not in queue so let it run
):
return schedstate(is_due, next_time_to_run)
# Task should be run (is_due) and it is present in queue (is_task_in_queue)
H = self._heap
if not H:
return schedstate(False, self.max_interval)
event = H[0]
verify = heappop(H)
if verify is event:
next_entry = self.reserve(entry)
heappush(H, event_t(self._when(next_entry, next_time_to_run), event[1], next_entry))
else:
heappush(H, verify)
next_time_to_run = min(verify[0], next_time_to_run)
return schedstate(False, min(next_time_to_run, self.max_interval))