我正在 Celery 中运行一组任务,如下所示:
def run_many(n):
job_id = str(uuid4())
g = group(dummy.s(job_id) for _ in range(n))
job = task.apply_async(task_id=job_id)
job.save() # so children are accessible later in the GroupResult
@app.task(bind=True, ignore_result=True)
def dummy(self, job_id):
outputdir = Path(job_id/self.request.id).resolve()
print(f"dummy: {self.request.id}")
# do some work and write results into outputdir
我这样监控整个交易的状态:
def check_job_status(job_id):
job = GroupResult.restore(id=job_id, app=app)
n_total = len(job.children)
n_finished = job.completed_count()
status = "SUCCESS" if n_finished == n_total else "RUNNING"
return status, n_finished/n_total
我希望能够获取有关单个任务的信息,包括创建该任务的作业的
id
,但我不知道如何存储该信息:
def check_task_status(task_id):
result = AsyncResult(id=task_id, app=app)
在该函数中,即使我在
result.parent
中设置 None
,self.parent = job_id
仍是 dummy()
。
是否有某种方法可以存储最终写入后端并由
AsyncResult
加载的任务的自定义元数据?我知道我可以使用 update_state(state, meta)
,但是每次我想更新任务状态时使用 meta
键创建 job_id
字典似乎很麻烦,特别是如果我想从调用的函数更新状态,因为这样我就传递 job_id
。如果我可以从任务中访问任务当前的 meta
字典,我可以直接 update()
字典,但我不知道如何从任务本身中检索该字典。
有什么想法给我吗?
你可以做这样的事情(但这种获取元的方法被标记为内部):
@app.task(bind=True, ignore_result=True)
def dummy(self, job_id):
outputdir = Path(job_id/self.request.id).resolve()
print(f"dummy: {self.request.id}")
# do some work and write results into outputdir
from celery.app.task import Context
from celery.backends.s3 import S3Backend
from celery.backends.base import KeyValueStoreBackend, BaseKeyValueStoreBackend
request: Context = self.request
# I use the S3Backend but you may use another
backend: S3Backend | KeyValueStoreBackend | BaseKeyValueStoreBackend = self.backend
meta = backend._get_result_meta(
result=None, state="STARTED", traceback=None, request=request
)
meta["parent_id"] = job_id
self.update_state(task_id=request.id, state="STARTED", meta=meta)