Celery Beat:一次限制为单个任务实例

问题描述 投票:6回答:5

我有芹菜和芹菜(四名工人)批量做一些加工步骤。其中一项任务大致是这样的,“对于每个没有创建Y的X,创建Y.”

该任务以半快速(10秒)定期运行。任务很快完成。还有其他任务正在进行中。

我多次遇到这个问题,其中节拍任务显然已经积压,因此同时执行相同的任务(来自不同的节拍时间),导致错误的重复工作。似乎任务也是无序执行的。

  1. 是否有可能限制芹菜节拍以确保一次只有一个突出的任务实例?设置像rate_limit=5这样的任务是“正确”的方式吗?
  2. 是否有可能确保按顺序执行节拍任务,例如,而不是派遣任务,节拍将其添加到任务链?
  3. 处理这个问题的最佳方法是什么,除了使这些任务本身以原子方式执行并且可以安全地同时执行?这不是我对节拍任务的预期限制......

任务本身是天真地定义的:

@periodic_task(run_every=timedelta(seconds=10))
def add_y_to_xs():
    # Do things in a database
    return

这是一个实际的(清理过的)日志:

  • 发送了[00:00.000] foocorp.tasks.add_y_to_xs。 ID - >#1
  • [00:00.001]收到的任务:foocorp.tasks.add_y_to_xs [#1]
  • 发送了[00:10.009] foocorp.tasks.add_y_to_xs。 ID - >#2
  • 发送了[00:20.024] foocorp.tasks.add_y_to_xs。 ID - >#3
  • [00:26.747]收到的任务:foocorp.tasks.add_y_to_xs [#2]
  • [00:26.748] TaskPool:应用#2
  • [00:26.752]收到的任务:foocorp.tasks.add_y_to_xs [#3]
  • 接受[00:26.769]任务:foocorp.tasks.add_y_to_xs [#2] pid:26528
  • [00:26.775]任务foocorp.tasks.add_y_to_xs [#2]成功进入0.0197986490093s:无
  • [00:26.806] TaskPool:应用#1
  • [00:26.836] TaskPool:应用#3
  • [01:30.020]任务接受:foocorp.tasks.add_y_to_xs [#1] pid:26526
  • [01:30.053]任务接受:foocorp.tasks.add_y_to_xs [#3] pid:26529
  • [01:30.055] foocorp.tasks.add_y_to_xs [#1]:为X id添加Y#9725
  • [01:30.070] foocorp.tasks.add_y_to_xs [#3]:为X id添加Y#9725
  • [01:30.074]任务foocorp.tasks.add_y_to_xs [#1]成功完成0.0594762689434s:无
  • [01:30.087]任务foocorp.tasks.add_y_to_xs [#3]成功进入0.0352867960464s:无

我们目前正在使用带有RabbitMQ的Celery 3.1.4作为传输。

编辑丹,这是我想出的:

丹,这是我最终使用的:

from sqlalchemy import func
from sqlalchemy.exc import DBAPIError
from contextlib import contextmanager


def _psql_advisory_lock_blocking(conn, lock_id, shared, timeout):
    lock_fn = (func.pg_advisory_xact_lock_shared
               if shared else
               func.pg_advisory_xact_lock)
    if timeout:
        conn.execute(text('SET statement_timeout TO :timeout'),
                     timeout=timeout)
    try:
        conn.execute(select([lock_fn(lock_id)]))
    except DBAPIError:
        return False
    return True


def _psql_advisory_lock_nonblocking(conn, lock_id, shared):
    lock_fn = (func.pg_try_advisory_xact_lock_shared
               if shared else
               func.pg_try_advisory_xact_lock)
    return conn.execute(select([lock_fn(lock_id)])).scalar()


class DatabaseLockFailed(Exception):
    pass


@contextmanager
def db_lock(engine, name, shared=False, block=True, timeout=None):
    """
    Context manager which acquires a PSQL advisory transaction lock with a
    specified name.
    """
    lock_id = hash(name)

    with engine.begin() as conn, conn.begin():
        if block:
            locked = _psql_advisory_lock_blocking(conn, lock_id, shared,
                                                  timeout)
        else:
            locked = _psql_advisory_lock_nonblocking(conn, lock_id, shared)
        if not locked:
            raise DatabaseLockFailed()
        yield

和芹菜任务装饰器(仅用于定期任务):

from functools import wraps
from preo.extensions import db


def locked(name=None, block=True, timeout='1s'):
    """
    Using a PostgreSQL advisory transaction lock, only runs this task if the
    lock is available. Otherwise logs a message and returns `None`.
    """
    def with_task(fn):
        lock_id = name or 'celery:{}.{}'.format(fn.__module__, fn.__name__)

        @wraps(fn)
        def f(*args, **kwargs):
            try:
                with db_lock(db.engine, name=lock_id, block=block,
                             timeout=timeout):
                    return fn(*args, **kwargs)
            except DatabaseLockFailed:
                logger.error('Failed to get lock.')
                return None
        return f
    return with_task
python concurrency rabbitmq celery celerybeat
5个回答
6
投票

唯一的方法是implementing a locking strategy yourself

请阅读here部分,以供参考。

与cron一样,如果第一个任务在下一个任务之前没有完成,任务可能会重叠。如果这是一个问题,您应该使用锁定策略来确保一次只能运行一个实例(例如,参见确保任务一次只执行一个)。


9
投票
def skip_if_running(f):
    u"""
    не запускает задачу с такими же параметрами если она уже в обработке
    """
    task_name = u'%s.%s' % (f.__module__, f.__name__)
    mylog.info(u'skip decorator for %s' % task_name)
    @wraps(f)
    def fun(self, *args, **kwargs):
        try:
            uargs = unicode(args)
            ukwargs = unicode(kwargs)

            i = clr_app.control.inspect()
            workers = i.active()
            for worker, tasks in workers.items():
                for task in tasks:
                    if task_name == task['name'] and uargs == task['args'] and ukwargs == task['kwargs'] and self.request.id != task['id']:
                        mylog.warning(u'task %s (%s, %s) is started on %s, skip current' % (task_name, uargs, ukwargs, worker))
                        return None
        except Exception as e:
            mylog.error(e)
        return f(*args, **kwargs)
    return fun

@clr_app.task(bind=True)
@skip_if_running
def test_single_task(arg):
    pass

2
投票

我写了一个装饰师使用Postgres advisory locking,类似于他在评论中提到的erydo。

它不是很漂亮,但似乎工作正常。这是Python 2.7下的SQLAlchemy 0.9.7。

from functools import wraps
from sqlalchemy import select, func

from my_db_module import Session # SQLAlchemy ORM scoped_session

def pg_locked(key):
    def decorator(f):
        @wraps(f)
        def wrapped(*args, **kw):
            session = db.Session()
            try:
                acquired, = session.execute(select([func.pg_try_advisory_lock(key)])).fetchone()
                if acquired:
                    return f(*args, **kw)
            finally:
                if acquired:
                    session.execute(select([func.pg_advisory_unlock(key)]))
        return wrapped
    return decorator

@app.task
@pg_locked(0xdeadbeef)
def singleton_task():
    # only 1x this task can run at a time
    pass

(欢迎任何关于如何改进这一点的评论!)


2
投票

我使用celery-once解决了这个问题,我扩展到了celery-one

两者都适用于您的问题。它使用Redis来锁定正在运行的任务。 celery-one还将跟踪锁定的任务。

以下是芹菜拍的一个非常简单的用法示例。在下面的代码中,slow_task每1秒安排一次,但它的完成时间是5秒。普通芹菜即使已经运行,也会每秒安排一次任务。 celery-one会阻止这种情况。

celery = Celery('test')
celery.conf.ONE_REDIS_URL = REDIS_URL
celery.conf.ONE_DEFAULT_TIMEOUT = 60 * 60
celery.conf.BROKER_URL = REDIS_URL
celery.conf.CELERY_RESULT_BACKEND = REDIS_URL

from datetime import timedelta

celery.conf.CELERYBEAT_SCHEDULE = {
    'add-every-30-seconds': {
        'task': 'tasks.slow_task',
        'schedule': timedelta(seconds=1),
        'args': (1,)
    },
}

celery.conf.CELERY_TIMEZONE = 'UTC'


@celery.task(base=QueueOne, one_options={'fail': False})
def slow_task(a):
    print("Running")
    sleep(5)
    return "Done " + str(a)

1
投票

需要一个分布式锁定系统,因为Celery beat实例本质上是不同的进程,可能跨越不同的主机。

ZooKeeper和etcd等中心坐标系适用于分布式锁定系统的实现。

我推荐使用etcd,它轻巧而快速。锁定etcd有几种实现方式,例如:

python-etcd-lock

© www.soinside.com 2019 - 2024. All rights reserved.