Celery Groups:如何创建一组已经运行的任务?

问题描述 投票:0回答:0

我有一组长时间运行的任务,我需要在小组中进行协调。挑战在于任务必须在创建后立即触发,而不是在定义所有任务之后(如下所示)。

问题是

group(...)
不接受
AsyncTask
并且在任务上调用
apply_async
导致每个任务运行两次。

import time
from celery import group, chain, chord, subtask


# Run it
job_id_to_track = rate_limited_task.s(end_task.s()).apply_async()


# TASKS
@app.task(bind=true)
def rate_limited_task(self, downstream_task):
 
    tasks = []

    for random_number in np.random.randint(1, 10, 1000):
        # ... imagine it takes several seconds to generate a random number. This will loop many times
        
        task = parallel_tasks.si(random_number) # This needs to fire immediately 
        tasks.append(task)


    pipeline = group(tasks) | downstream_task

    return pipeline()

@app.task(bind=true)
def parallel_tasks(self, data):
    # Another long running task
    print(f'sleeping for {data} seconds')
    time.sleep(data)

    return data

@app.task(bind=true)
def end_task(self, results):

        print('End task')
        print(results)

问题:是否可以创建一组已经在运行(或处于任何状态)的任务?


当前解决方案(不理想)

from celery.result import allow_join_result, GroupResult

@app.task(bind=true)
def rate_limited_task(self, downstream_task):
 
    tasks = []

    for random_number in np.random.randint(1, 10, 1000):
        # ... imagine it takes several seconds to generate a random number. This will loop many times
        
        task = parelel_tasks.apply_async([random_number])
        tasks.append(task)


    gid = uuid()

    result_set = GroupResult(id=gid, results=tasks)
    result_set.save(backend=self.backend)

    chain(
        group_tasks.s(gid),
        downstream_task
    )()
...

@app.task(bind=true)
def group_tasks(self, group_task_id):

    group_task = GroupResult.restore(group_task_id, app=app, backend=self.backend)

    # Less than ideal solution.
    # Needs logic to handle any failed tasks (solved in the group class)
    # Tasks should have an expiration time so that they are not retried forever

    if not group_task.ready():
        raise self.retry(countdown=1)

    with allow_join_result():
        results = group_task.join()
        return results
python asynchronous celery scheduling group
© www.soinside.com 2019 - 2024. All rights reserved.