就像这个其他问题一样,我想从芹菜任务返回的列表中创建一个芹菜组。这个想法是第一个任务将返回一个列表,第二个任务将该列表分解为列表中每个项目的并发任务。
计划是在下载内容时使用它。第一个任务从网站获取链接,第二个任务是下载页面、处理它然后上传到 s3 的链。最后,完成所有子页面后,该网站将在我们的数据库中标记为已完成。像这样的东西:
chain(
get_links_from_website.si('https://www.google.com'),
dmap.s( # <-- Distributed map
download_sub_page.s() |
process_sub_page.s() |
upload_sub_page_to_s3.s()
),
mark_website_done.s()
)
到目前为止我看到的解决方案似乎做得很好,但是当第二个任务是链时失败,因为
clone
没有做深度复制的问题(有关详细信息,请参阅对此答案的评论 ):
@task
def dmap(it, callback):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
return group(callback.clone([arg,]) for arg in it)()
还有一个问题是,如果可迭代对象的长度为 10,000 个项目,它将创建一个包含 10,000 个项目的组。正如您所想象的那样,这正在炸毁我们的内存使用量。
所以,我正在寻找的是一种方法来做
dmap
:
celery canvas 提供了 chunks 来将任务分成块。不幸的是,这不适用于链、组等原语。
您可以使用芹菜信号来防止 dmap/clone 出现问题。
ch = chain(
download_sub_page.s(),
process_sub_page.s(),
upload_sub_page.s(),
)
@task_success.connect(sender='get_links_from_website')
def task_success_handler(sender=None, headers=None, body=None, **kwargs):
result = kwargs['result']
header = [ch(i) for i in result]
callback = mark_website_done.si()
chord(header)(callback)
创建处理页面的链,并使用弦将最后一个任务挂接到它。只要
get_links_from_website
成功运行,就会执行此函数。
根据连锁所花费的时间,您还可以将
get_links_from_website
的结果保存在某处。然后迭代其中的一批以排队链,并且对于最后一批,您可以挂钩最后一个任务的回调。
这有点 hacky 但我们正在使用 deepcopy 来克隆回调,这修复了 Signature 的浅拷贝的错误
def dmap(it, callback, final=None):
# Map a callback over an iterator and return as a group
callback = subtask(callback)
run_in_parallel = group(subtask(copy.deepcopy(dict(callback))).clone([arg, ]) for arg in it)
if len(run_in_parallel.tasks) == 0:
return []
if final:
return chord(run_in_parallel)(final)
return run_in_parallel.delay()
请注意,这仅适用于一个嵌套级别(即回调是一个链/组/弦),但不适用于深层嵌套的回调
对于深度嵌套的回调图,我们使用这个 hack,它有点慢但工作完美
# Hack to completely clone a signature with possibly complex subtasks (chains, chords, etc...)
run_in_parallel = group(pickle.loads(pickle.dumps(callback)).clone([arg, ]) for arg in it)
对于组的大小,您总是可以将迭代器拆分为块
如果有人遇到这个问题,Jether 的回答很有帮助,但并不完美。对我们来说,存在三个问题:
callback
本身是一条链,则答案不会将参数传递到链上。 https://stackoverflow.com/a/59023231/19882725 通过clone_signature
帮助提供解决方案。这似乎适用于使用 RabbitMQ 作为代理的合理嵌套链,但我们没有尝试任何极端的东西(因此不需要调整它来使用pickle
)。callback
是一个组或和弦,我们需要将参数应用于每个克隆的任务,因此我们修改了 (1) 中的 clone_signature
以适应这种情况。final
失败了 - 我们采用了 https://github.com/celery/celery/issues/5265 的解决方案,将 final 从 dict
转换为 Signature
.final
在很多情况下实际上不会执行,因为 chord
收到的是 Group
而不是任务列表。对于任何好奇的人,这是我们的最终解决方案:
import copy
from celery import Signature, chord, group, shared_task, subtask
def clone_signature(sig, args=(), kwargs=(), **opts):
"""
Turns out that a chain clone() does not copy the arguments properly - this
clone does.
From: https://stackoverflow.com/a/53442344/3189
"""
if sig.subtask_type and sig.subtask_type not in ["chain", "group", "chord"]:
raise NotImplementedError(
"Cloning only supported for tasks, chains, groups, and chords, not {}".format(
sig.subtask_type
)
)
clone = sig.clone()
# if the task we're cloning is a group or chord, apply the arguments to each of the children
if sig.subtask_type and sig.subtask_type in ["group", "chord"]:
clone.tasks = [
clone_signature(task, args=args, kwargs=kwargs, opts=opts)
for task in clone.tasks
]
# otherwise, apply the arguments to either the task itself (if it's a single task)
# or the first child task (if it's a chain)
else:
if hasattr(clone, "tasks"):
task_to_apply_args_to = clone.tasks[0]
else:
task_to_apply_args_to = clone
args, kwargs, opts = task_to_apply_args_to._merge(
args=args, kwargs=kwargs, options=opts
)
task_to_apply_args_to.update(
args=args, kwargs=kwargs, options=copy.deepcopy(opts)
)
return clone
@shared_task
def dmap(it, callback, final=None):
if not len(it):
return []
callback = subtask(callback)
run_in_parallel = [
clone_signature(callback, args if type(args) is list else [args]) for args in it
]
if not final:
return group(*run_in_parallel).delay()
# see https://github.com/celery/celery/issues/5265
if not isinstance(final, Signature):
final["immutable"] = True
final = Signature.from_dict(final)
return chord(run_in_parallel)(final)
这使我们能够成功执行如下嵌套的
dmap
s:
chain(
taskA.s(),
dmap.s(
chain(
taskB.s(),
taskC.s(),
dmap.s(
taskD.s(),
final=chain(
taskE.s(),
taskF.s(),
),
),
),
),
).delay()