为了数据完整性,我有一系列应按顺序执行的任务。我不需要任何类型的 celery 编排;我可以让一项任务按顺序执行顶层步骤。
这是在 django、python 3.11 和最新的 celery 中。 Redis 是代理和结果存储。
其中一些任务使用缓慢的 API 来获取页面。每个页面都会触发处理和许多数据库写入。由于数量原因,这些写入本身很慢,因此为了在挂钟意义上更快,我希望在 API 到达页面后立即将 API 处理的每个页面卸载到子任务。为此,我可以创建一个子任务。我“即时”创建它们。
很多 celery 编排表明您提前准备任务签名,然后调度它们。这种模式并不能节省我任何挂钟时间。
我想确保所有后台数据库写入子任务完成,然后再继续下一个顶级任务。
我希望,如果我使用链(具有不可变签名)作为顶级任务,我可以使用 .apply_async() 的“add_to_parent”参数以某种方式将动态子任务与其父任务关联起来。但这不起作用。文档说它默认为 True,但我还是设置它。
可能我误解了“add_to_parent”的含义,或者可能我没有正确实现它。
如果顶级任务是
@task api_1()
,则代码结构为
@task api_1() -> intermediate_function() -> @task write_database_page()
通过apply_async()
我想知道子任务
write_database_page()
如何知道它的父任务是什么,所以我怀疑并希望这是我的问题,并且我错过了一些让子任务知道父任务上下文的方法。
有很多看起来相似的问题,解决方案是使用和弦()。 但是,我认为事后没有任何方法可以将任务添加到和弦()中。看来和弦()要求我在开始之前收集所有数据库写入任务。
另一种方法是以某种方式从子任务中收集所有任务 ID,并将它们传递给下一个顶级函数,然后该函数将以某种方式以非阻塞方法等待它们完成。令我惊讶的是,我想要的模式不是由 Celery 处理的,而且看起来更有可能是我做错了。
celery 对其编排没有帮助。使用 add_to_parent (默认)确实可以正确显示 redis 中的父/子关系,但 celery 不能或不会修改已发送给工作人员的任务。组、链和和弦仅适用于创建它们时存在的任务签名。对于由于创建链或弦时不存在的新依赖项而被阻止的任务,celery 可能需要更改可能已发送给工作人员的作业,即只有任务执行时已知的依赖项可以使用预定的。
这个需求反而需要手动完成;一种解决方案是等待子任务完成的循环。
例如,这是直接添加三个新任务的代码,并通过中间函数再添加两个
@app.task(bind=True)
def task_dummy_task1(self, part_number: int, job_id: int = None):
job, job_id = JobMaster.get_job(job_id, job_title="dummy task")
sleeping_duration = 5
subtask_ids = []
job.log_message(log_message=f"Entered dummy task 1 with sleeping duration of {sleeping_duration}")
job.log_message(log_message="In dummy task1, creating subtask a")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_a", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating subtask b")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_b", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating subtask c")
subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_c", "job_id": job_id},
add_to_parent=True)
subtask_ids.append(subtask.id)
sleeping_duration = 5
job.log_message(log_message="In dummy task1, creating intermediary subtask d")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_d", job_id=job_id)
subtask_ids.append(subtask.id)
job.log_message(log_message="In dummy task1, creating intermediary subtask e")
subtask = intermediary_dummy_subtask_function(parent_task_name="task1_e", job_id=job_id)
subtask_ids.append(subtask.id)
time.sleep(sleeping_duration)
# wait for subtasks to complete
wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
msg="Waiting in dummy task1 for subtasks to complete")
job.log_message(log_message="Finished dummy task1 main body")
return part_number
...
def wait_for_tasks_to_complete(async_ids: List[str], job_id: int = None, msg: str = None, timeout: int = 300):
job, job_id = JobMaster.get_job(job_id, job_title="waiting for refresh data")
job.log_message(log_message=f"Waiting for {len(async_ids)} tasks to complete, {msg}", status=consts.IN_PROGRESS,
job_score=0)
job.log_message(log_message=f"tasks: {async_ids}", status=consts.IN_PROGRESS, job_score=0)
all_success = True
count_down = timeout
while count_down > 0:
for async_id in async_ids:
result = app.AsyncResult(async_id) # type:AsyncResult
status = result.status
if status == "SUCCESS":
returned_value = result.result
job.log_message(log_message=f"Confirmed status SUCCESS with {returned_value=}")
# remove successful task from list
async_ids.remove(async_id)
else:
all_success = False
pass
if all_success or len(async_ids) == 0:
job.log_message(log_message="Finished waiting for refresh data, all tasks succeeded",
status=consts.COMPLETED, job_score=100)
return
count_down -= 1
job.log_message(log_message=f"There are {len(async_ids)} tasks remaining")
time.sleep(1)
job.log_message(log_message="After waiting for {timeout=}, some tasks did not complete on time", status=consts.ERRORS_FOUND,
job_score=100)
... 中介功能是:
def intermediary_dummy_subtask_function(parent_task_name, job_id)->AsyncResult:
job, job_id = JobMaster.get_job(job_id, job_title="dummy task")
job.log_message(
log_message=f"Intermediary function for {parent_task_name} has been reached, will now make a task")
r = task_dummy_subtask.apply_async(kwargs={"parent_task_name": parent_task_name, "job_id": job_id},
add_to_parent=True)
return r