我有一组长时间运行的任务,我需要在小组中进行协调。挑战在于任务必须在创建后立即触发,而不是在定义所有任务之后(如下所示)。
问题是
group(...)
不接受 AsyncTask
并且在任务上调用 apply_async
导致每个任务运行两次。
import time
from celery import group, chain, chord, subtask
# Run it
job_id_to_track = rate_limited_task.s(end_task.s()).apply_async()
# TASKS
@app.task(bind=true)
def rate_limited_task(self, downstream_task):
tasks = []
for random_number in np.random.randint(1, 10, 1000):
# ... imagine it takes several seconds to generate a random number. This will loop many times
task = parallel_tasks.si(random_number) # This needs to fire immediately
tasks.append(task)
pipeline = group(tasks) | downstream_task
return pipeline()
@app.task(bind=true)
def parallel_tasks(self, data):
# Another long running task
print(f'sleeping for {data} seconds')
time.sleep(data)
return data
@app.task(bind=true)
def end_task(self, results):
print('End task')
print(results)
问题:是否可以创建一组已经在运行(或处于任何状态)的任务?
当前解决方案(不理想)
from celery.result import allow_join_result, GroupResult
@app.task(bind=true)
def rate_limited_task(self, downstream_task):
tasks = []
for random_number in np.random.randint(1, 10, 1000):
# ... imagine it takes several seconds to generate a random number. This will loop many times
task = parelel_tasks.apply_async([random_number])
tasks.append(task)
gid = uuid()
result_set = GroupResult(id=gid, results=tasks)
result_set.save(backend=self.backend)
chain(
group_tasks.s(gid),
downstream_task
)()
...
@app.task(bind=true)
def group_tasks(self, group_task_id):
group_task = GroupResult.restore(group_task_id, app=app, backend=self.backend)
# Less than ideal solution.
# Needs logic to handle any failed tasks (solved in the group class)
# Tasks should have an expiration time so that they are not retried forever
if not group_task.ready():
raise self.retry(countdown=1)
with allow_join_result():
results = group_task.join()
return results