在高故障率集群上使用Dask进行分布式链式计算?

问题描述 投票:0回答:2

我正在使用

Dask
Bag 在 special 集群上运行一些简单的映射归约计算:

import dask.bag as bag

summed_image = bag.from_sequence(my_ids).map(gen_image_from_ids).reduction(sum, sum).compute()

此代码生成链式计算,从

from_sequence
gen_image_from_ids
开始映射,然后将所有结果归结为
sum
的结果。由于 Dask Bag 的功能,求和是在多级树中并行完成的。

我的special集群设置故障率较高,因为我的worker随时可能被杀死,CPU被其他高阶进程接管,过一段时间后又被释放。每 5 分钟可能只在单个节点上发生一次杀戮,但我的总缩减工作可能需要超过 5 分钟。

虽然 Dask 擅长故障恢复,但我的工作有时永远不会结束。考虑一下,如果作业树中的任何内部节点被杀死,则所有先前计算的临时中间结果都会丢失。并且计算应该从头开始。

Dask Future 对象有 replicate,但我在更高级别的 Dask Bag 或 Dataframe 上找不到类似的功能来确保数据弹性。请告诉我是否有一种通用的处理方法可以将中间结果保留在故障率超高的Dask集群中。

更新 - 我的解决方法

也许任何分布式计算系统都会遭受频繁的故障,即使系统可以从中恢复。就我而言,工作进程关闭本质上并不是系统故障,而是由高阶进程触发的。因此,高阶进程现在不再直接杀死我的工作人员,而是启动一个小的 python 脚本,在它开始运行时发送 retire_worker() 命令。

根据记录,通过

retire_worker()
调度程序会将数据从退休工人移动到另一可用工人。这样我的问题就暂时解决了。然而,我仍然保留这个问题,因为我认为复制、冗余计算将是一个更快的解决方案,并且更好地使用集群中的空闲节点。

python mapreduce dask dask-distributed dask-dataframe
2个回答
1
投票

这可能不是您正在寻找的解决方案,但一种选择是将任务序列划分为足够小的批次,以确保任务及时完成(或快速从头开始重新执行)。

也许是这样的:

import dask.bag as db
from toolz import partition_all

n_per_chunk = 100 # just a guess, the best number depends on the case
tasks = list(partition_all(n_per_chunk, my_ids))

results = []
for t in tasks:
    summed_image = (
        db
        .from_sequence(my_ids)
        .map(gen_image_from_ids)
        .reduction(sum, sum)
        .compute()
    )
    results.append(summed_image)

summed_image = sum(results) # final result

关于在失败时重新启动工作流程(或可能并行启动较小的任务),还有其他一些事情需要记住,但希望这能为您提供可行解决方案的起点。


1
投票

更新:稍后进行更多试验——这个答案并不理想,因为

client.replicate()
命令被阻塞。我怀疑它要求在创建副本之前完成所有 futures ——这是不需要的,因为 1. 任何中间节点都可以在所有准备就绪之前断开连接,2. 它会阻止其他任务异步运行。我需要其他方法来制作复制品。

经过多次尝试,我找到了一种方法,可以复制链式计算时的中间结果,实现数据冗余。请注意,并行

reduction
功能是 Dask
Bag
功能,它不直接支持
replicate
设施。然而,正如 Dask 文档所述,人们可以复制低级 Dask
Future
对象来提高弹性。

按照 @SultanOrazbayev 的帖子手动执行部分求和,使用 persist() 函数将部分求和保留在集群内存中,如评论中所示,返回的项本质上是 Dask

Future
:

import dask.bag as db
from dask.distributed import futures_of
from toolz import partition_all

n_per_chunk = 100 # just a guess, the best number depends on the case
tasks = list(partition_all(n_per_chunk, my_ids))

bags = []
for t in tasks:
    summed_image = (
        db
        .from_sequence(my_ids)
        .map(gen_image_from_ids)
        .reduction(sum, sum)
        .persist()
    )
    bags.append(summed_image)

futures = futures_of(bags)  # This can only be called on the .persist() result

然后我可以复制这些远程中间部分总和,并且对

sum
期货获得最终结果感到更安全:

client.replicate(futures, 5) # Improve resiliency by replicating to 5 workers
summed_image = client.submit(sum, futures).result()  # The only line that blocks for the final result

在这里,我觉得 5 的副本对于我的集群来说是稳定的,尽管较高的值会产生更高的网络开销来在工作人员之间传递副本。

这可行,但可能会得到改进,例如如何对中间结果执行并行归约(求和),特别是当有很多任务时。请留下您的建议。

© www.soinside.com 2019 - 2024. All rights reserved.