Celery链,如何等待动态创建的子任务(好像排除了chord)

问题描述 投票:0回答:1

为了数据完整性,我有一系列应按顺序执行的任务。我不需要任何类型的 celery 编排;我可以让一项任务按顺序执行顶层步骤。

这是在 django、python 3.11 和最新的 celery 中。 Redis 是代理和结果存储。

其中一些任务使用缓慢的 API 来获取页面。每个页面都会触发处理和许多数据库写入。由于数量原因,这些写入本身很慢,因此为了在挂钟意义上更快,我希望在 API 到达页面后立即将 API 处理的每个页面卸载到子任务。为此,我可以创建一个子任务。我“即时”创建它们。

很多 celery 编排表明您提前准备任务签名,然后调度它们。这种模式并不能节省我任何挂钟时间。

我想确保所有后台数据库写入子任务完成,然后再继续下一个顶级任务。

我希望,如果我使用链(具有不可变签名)作为顶级任务,我可以使用 .apply_async() 的“add_to_parent”参数以某种方式将动态子任务与其父任务关联起来。但这不起作用。文档说它默认为 True,但我还是设置它。

可能我误解了“add_to_parent”的含义,或者可能我没有正确实现它。

如果顶级任务是

@task api_1()
,则代码结构为
@task api_1() -> intermediate_function() -> @task write_database_page()
通过
apply_async()

我想知道子任务

write_database_page()
如何知道它的父任务是什么,所以我怀疑并希望这是我的问题,并且我错过了一些让子任务知道父任务上下文的方法。

有很多看起来相似的问题,解决方案是使用和弦()。 但是,我认为事后没有任何方法可以将任务添加到和弦()中。看来和弦()要求我在开始之前收集所有数据库写入任务。

另一种方法是以某种方式从子任务中收集所有任务 ID,并将它们传递给下一个顶级函数,然后该函数将以某种方式以非阻塞方法等待它们完成。令我惊讶的是,我想要的模式不是由 Celery 处理的,而且看起来更有可能是我做错了。

python celery
1个回答
0
投票

celery 对其编排没有帮助。使用 add_to_parent (默认)确实可以正确显示 redis 中的父/子关系,但 celery 不能或不会修改已发送给工作人员的任务。组、链和和弦仅适用于创建它们时存在的任务签名。对于由于创建链或弦时不存在的新依赖项而被阻止的任务,celery 可能需要更改可能已发送给工作人员的作业,即只有任务执行时已知的依赖项可以使用预定的。

这个需求反而需要手动完成;一种解决方案是等待子任务完成的循环。

例如,这是直接添加三个新任务的代码,并通过中间函数再添加两个

@app.task(bind=True)
def task_dummy_task1(self, part_number: int, job_id: int = None):
    job, job_id = JobMaster.get_job(job_id, job_title="dummy task")
    sleeping_duration = 5
    subtask_ids = []
    job.log_message(log_message=f"Entered dummy task 1 with sleeping duration of {sleeping_duration}")

    job.log_message(log_message="In dummy task1, creating subtask a")
    subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_a", "job_id": job_id},
                                             add_to_parent=True)
    subtask_ids.append(subtask.id)
    job.log_message(log_message="In dummy task1, creating subtask b")

    subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_b", "job_id": job_id},
                                             add_to_parent=True)
    subtask_ids.append(subtask.id)
    job.log_message(log_message="In dummy task1, creating subtask c")

    subtask = task_dummy_subtask.apply_async(kwargs={"parent_task_name": "task1_c", "job_id": job_id},
                                             add_to_parent=True)
    subtask_ids.append(subtask.id)
    sleeping_duration = 5

    job.log_message(log_message="In dummy task1, creating intermediary subtask d")
    subtask = intermediary_dummy_subtask_function(parent_task_name="task1_d", job_id=job_id)
    subtask_ids.append(subtask.id)
    job.log_message(log_message="In dummy task1, creating intermediary subtask e")
    subtask = intermediary_dummy_subtask_function(parent_task_name="task1_e", job_id=job_id)
    subtask_ids.append(subtask.id)

    time.sleep(sleeping_duration)
    # wait for subtasks to complete
    wait_for_tasks_to_complete(async_ids=subtask_ids, job_id=job_id,
                                    msg="Waiting in dummy task1 for subtasks to complete")

    job.log_message(log_message="Finished dummy task1 main body")

    return part_number

...

def wait_for_tasks_to_complete(async_ids: List[str], job_id: int = None, msg: str = None, timeout: int = 300):
    job, job_id = JobMaster.get_job(job_id, job_title="waiting for refresh data")
    job.log_message(log_message=f"Waiting for {len(async_ids)} tasks to complete, {msg}", status=consts.IN_PROGRESS,
                    job_score=0)
    job.log_message(log_message=f"tasks: {async_ids}", status=consts.IN_PROGRESS, job_score=0)

    all_success = True
    count_down = timeout
    while count_down > 0:
        for async_id in async_ids:
            result = app.AsyncResult(async_id)  # type:AsyncResult
            status = result.status
            if status == "SUCCESS":
                returned_value = result.result
                job.log_message(log_message=f"Confirmed status SUCCESS with {returned_value=}")
                # remove successful task from list
                async_ids.remove(async_id)
            else:
                all_success = False
                pass

        if all_success or len(async_ids) == 0:
            job.log_message(log_message="Finished waiting for refresh data, all tasks succeeded",
                            status=consts.COMPLETED, job_score=100)
            return
        count_down -= 1
        job.log_message(log_message=f"There are {len(async_ids)} tasks remaining")
        time.sleep(1)
    job.log_message(log_message="After waiting for {timeout=}, some tasks did not complete on time", status=consts.ERRORS_FOUND,
                    job_score=100)

... 中介功能是:

def intermediary_dummy_subtask_function(parent_task_name, job_id)->AsyncResult:
    job, job_id = JobMaster.get_job(job_id, job_title="dummy task")
    job.log_message(
        log_message=f"Intermediary function for {parent_task_name} has been reached, will now make a task")
    r = task_dummy_subtask.apply_async(kwargs={"parent_task_name": parent_task_name, "job_id": job_id},
                                       add_to_parent=True)
    return r
© www.soinside.com 2019 - 2024. All rights reserved.