我有芹菜和芹菜(四名工人)批量做一些加工步骤。其中一项任务大致是这样的,“对于每个没有创建Y的X,创建Y.”
该任务以半快速(10秒)定期运行。任务很快完成。还有其他任务正在进行中。
我多次遇到这个问题,其中节拍任务显然已经积压,因此同时执行相同的任务(来自不同的节拍时间),导致错误的重复工作。似乎任务也是无序执行的。
rate_limit=5
这样的任务是“正确”的方式吗?任务本身是天真地定义的:
@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
# Do things in a database
return
这是一个实际的(清理过的)日志:
[00:00.000]
foocorp.tasks.add_y_to_xs。 ID - >#1[00:00.001]
收到的任务:foocorp.tasks.add_y_to_xs [#1][00:10.009]
foocorp.tasks.add_y_to_xs。 ID - >#2[00:20.024]
foocorp.tasks.add_y_to_xs。 ID - >#3[00:26.747]
收到的任务:foocorp.tasks.add_y_to_xs [#2][00:26.748]
TaskPool:应用#2[00:26.752]
收到的任务:foocorp.tasks.add_y_to_xs [#3][00:26.769]
任务:foocorp.tasks.add_y_to_xs [#2] pid:26528[00:26.775]
任务foocorp.tasks.add_y_to_xs [#2]成功进入0.0197986490093s:无[00:26.806]
TaskPool:应用#1[00:26.836]
TaskPool:应用#3[01:30.020]
任务接受:foocorp.tasks.add_y_to_xs [#1] pid:26526[01:30.053]
任务接受:foocorp.tasks.add_y_to_xs [#3] pid:26529[01:30.055]
foocorp.tasks.add_y_to_xs [#1]:为X id添加Y#9725[01:30.070]
foocorp.tasks.add_y_to_xs [#3]:为X id添加Y#9725[01:30.074]
任务foocorp.tasks.add_y_to_xs [#1]成功完成0.0594762689434s:无[01:30.087]
任务foocorp.tasks.add_y_to_xs [#3]成功进入0.0352867960464s:无我们目前正在使用带有RabbitMQ的Celery 3.1.4作为传输。
编辑丹,这是我想出的:
丹,这是我最终使用的:
from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager
def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
lock_fn = (func.pg_advisory_xact_lock_shared
if shared else
func.pg_advisory_xact_lock)
if timeout:
conn.execute(text('SET statement_timeout TO :timeout'),
timeout=timeout)
try:
conn.execute(select([lock_fn(lock_id)]))
except DBAPIError:
return False
return True
def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
lock_fn = (func.pg_try_advisory_xact_lock_shared
if shared else
func.pg_try_advisory_xact_lock)
return conn.execute(select([lock_fn(lock_id)])).scalar()
class DatabaseLockFailed(Exception):
pass
@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
"""
Context manager which acquires a PSQL advisory transaction lock with a
specified name.
"""
lock_id = hash(name)
with engine.begin() as conn, conn.begin():
if block:
locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
timeout)
else:
locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
if not locked:
raise DatabaseLockFailed()
yield
和芹菜任务装饰器(仅用于定期任务):
from functools import wraps
from preo.extensions import db
def locked(name=None, block=True, timeout='1s'):
"""
Using a PostgreSQL advisory transaction lock, only runs this task if the
lock is available. Otherwise logs a message and returns `None`.
"""
def with_task(fn):
lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)
@wraps(fn)
def f(*args, **kwargs):
try:
with db_lock(db.engine, name=lock_id, block=block,
timeout=timeout):
return fn(*args, **kwargs)
except DatabaseLockFailed:
logger.error('Failed to get lock.')
return None
return f
return with_task
唯一的方法是implementing a locking strategy yourself:
请阅读here部分,以供参考。
与cron一样,如果第一个任务在下一个任务之前没有完成,任务可能会重叠。如果这是一个问题,您应该使用锁定策略来确保一次只能运行一个实例(例如,参见确保任务一次只执行一个)。
def skip_if_running(f):
u"""
не запускает задачу с такими же параметрами если она уже в обработке
"""
task_name = u'%s.%s' % (f.__module__, f.__name__)
mylog.info(u'skip decorator for %s' % task_name)
@wraps(f)
def fun(self, *args, **kwargs):
try:
uargs = unicode(args)
ukwargs = unicode(kwargs)
i = clr_app.control.inspect()
workers = i.active()
for worker, tasks in workers.items():
for task in tasks:
if task_name == task['name'] and uargs == task['args'] and ukwargs == task['kwargs'] and self.request.id != task['id']:
mylog.warning(u'task %s (%s, %s) is started on %s, skip current' % (task_name, uargs, ukwargs, worker))
return None
except Exception as e:
mylog.error(e)
return f(*args, **kwargs)
return fun
@clr_app.task(bind=True)
@skip_if_running
def test_single_task(arg):
pass
我写了一个装饰师使用Postgres advisory locking,类似于他在评论中提到的erydo。
它不是很漂亮,但似乎工作正常。这是Python 2.7下的SQLAlchemy 0.9.7。
from functools import wraps
from sqlalchemy import select, func
from my_db_module import Session # SQLAlchemy ORM scoped_session
def pg_locked(key):
def decorator(f):
@wraps(f)
def wrapped(*args, **kw):
session = db.Session()
try:
acquired, = session.execute(select([func.pg_try_advisory_lock(key)])).fetchone()
if acquired:
return f(*args, **kw)
finally:
if acquired:
session.execute(select([func.pg_advisory_unlock(key)]))
return wrapped
return decorator
@app.task
@pg_locked(0xdeadbeef)
def singleton_task():
# only 1x this task can run at a time
pass
(欢迎任何关于如何改进这一点的评论!)
我使用celery-once解决了这个问题,我扩展到了celery-one。
两者都适用于您的问题。它使用Redis来锁定正在运行的任务。 celery-one
还将跟踪锁定的任务。
以下是芹菜拍的一个非常简单的用法示例。在下面的代码中,slow_task
每1秒安排一次,但它的完成时间是5秒。普通芹菜即使已经运行,也会每秒安排一次任务。 celery-one
会阻止这种情况。
celery = Celery('test')
celery.conf.ONE_REDIS_URL = REDIS_URL
celery.conf.ONE_DEFAULT_TIMEOUT = 60 * 60
celery.conf.BROKER_URL = REDIS_URL
celery.conf.CELERY_RESULT_BACKEND = REDIS_URL
from datetime import timedelta
celery.conf.CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.slow_task',
'schedule': timedelta(seconds=1),
'args': (1,)
},
}
celery.conf.CELERY_TIMEZONE = 'UTC'
@celery.task(base=QueueOne, one_options={'fail': False})
def slow_task(a):
print("Running")
sleep(5)
return "Done " + str(a)
需要一个分布式锁定系统,因为Celery beat实例本质上是不同的进程,可能跨越不同的主机。
ZooKeeper和etcd等中心坐标系适用于分布式锁定系统的实现。
我推荐使用etcd,它轻巧而快速。锁定etcd有几种实现方式,例如: