Dask - 如何取消和重新提交停滞的任务?

问题描述 投票:0回答:2

我经常遇到这样的问题:Dask 在几个任务上随机停止,这些任务通常与从网络上的不同节点读取数据相关(有关此问题的更多详细信息请参见下文)。 在没有问题地运行脚本几个小时后,可能会发生这种情况。 它将以如下所示的形式无限期地挂起(否则此循环需要几秒钟才能完成):

enter image description here

在这种情况下,我看到只有少数停滞的进程,并且所有进程都位于一个特定节点(192.168.0.228)上: enter image description here

此节点上的每个工作线程都因几个 read_parquet 任务而停滞:

enter image description here

这是使用以下代码调用的,并使用 fastparquet:

ddf = dd.read_parquet(file_path, columns=['col1', 'col2'], index=False, gather_statistics=False)

我的集群正在运行 Ubuntu 19.04 以及 Dask 和 Distributed 的所有最新版本(截至 11/12)以及所需的软件包(例如,tornado、fsspec、fastparquet 等)

.228 节点尝试访问的数据位于我的集群中的另一个节点上。 .228节点通过CIFS文件共享访问数据。 我在运行脚本的同一节点上运行 Dask 调度程序(不同于 .228 节点和数据存储节点)。 该脚本使用 Paramiko 通过 SSH 将工作人员连接到调度程序:

ssh_client = paramiko.SSHClient()
stdin, stdout, stderr = ssh_client.exec_command('sudo dask-worker ' +
                                                            ' --name ' + comp_name_decode +
                                                            ' --nprocs ' + str(nproc_int) +
                                                            ' --nthreads 10 ' +
                                                            self.dask_scheduler_ip, get_pty=True)  

.228 节点与调度程序和数据存储节点的连接看起来都很健康。 .228 节点在尝试处理 read_parquet 任务时可能会遇到某种短暂的连接问题,但如果发生这种情况,0.228 节点与调度程序和 CIFS 共享的连接在该短暂时刻之后不会受到影响。 无论如何,日志没有显示任何问题。 这是 .228 节点的整个日志:

distributed.worker - INFO - Start worker at: tcp://192.168.0.228:42445

distributed.worker - INFO - Listening to: tcp://192.168.0.228:42445

distributed.worker - INFO - dashboard at: 192.168.0.228:37751

distributed.worker - INFO - Waiting to connect to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Threads: 2

distributed.worker - INFO - Memory: 14.53 GB

distributed.worker - INFO - Local Directory: /home/dan/worker-50_838ig

distributed.worker - INFO - -------------------------------------------------

distributed.worker - INFO - Registered to: tcp://192.168.0.167:8786

distributed.worker - INFO - -------------------------------------------------

抛开这是否是 Dask 中的错误或我的代码/网络中的错误,是否可以为调度程序处理的所有任务设置通用超时? 或者,是否可以:

  1. 识别停滞的任务,
  2. 复制停滞的任务并将其移至另一个工作人员,并且
  3. 取消停滞的任务?
python-3.x dask dask-distributed dask-delayed fastparquet
2个回答
1
投票

是否可以为调度程序处理的所有任务设置通用超时?

截至 2019-11-13 不幸的是答案是否定的。

如果任务正确失败,那么您可以使用

client.retry(...)
重试该任务,但没有自动方法让任务在一定时间后自行失败。 您必须自己将这些内容写入 Python 函数中。 不幸的是,很难中断另一个线程中的 Python 函数,这也是未实现此功能的部分原因。

如果工人倒下了,那么就会在其他地方尝试。 然而,从你所说的看来,一切都很健康,只是任务本身可能会永远持续下去。 不幸的是,很难将其识别为失败案例。


0
投票

这是在 Dask 中取消任务的代码示例。

您可以在

Python
级别中断任务,而不是在内部计算级别。

import asyncio
from dask.distributed import Client, Variable, get_worker
import dask.array as da

def _is_task_canceled():
    w = get_worker()
    current_task_state = w.tasks[w.get_current_task()].state

    return current_task_state == 'cancelled'


def inc(x: int) -> int:
    x1 = da.random.random((40000, 40000), chunks=(1000, 1000))
    y = x1 + x1.T
    z = y[::2, 5000:].mean(axis=1)
    result = z.compute()

    if _is_task_canceled():
        print('[CANCEL] Task has been canceled, RETURN!')
        return result

    print('[CONTUNUE] Task has not been canceled, GO_FORWARD!')

    x2 = da.random.random((40000, 40000), chunks=(1000, 1000))
    y2 = x2 + x2.T
    z2 = y2[::2, 5000:].mean(axis=1)

    result = z2.compute()

    return result


async def f():
    do_cancel_task = True

    async with Client(address='127.0.0.1:8786', asynchronous=True) as client:
        future = client.submit(inc, 10)

        await asyncio.sleep(3.0)

        if do_cancel_task:
            await client.cancel([future], force=True)
            print('No result, task has been canceled.')
        else:
            result = await future
            print(result)


if __name__ == '__main__':
    asyncio.get_event_loop().run_until_complete(f())
© www.soinside.com 2019 - 2024. All rights reserved.