Dask 分布式暂停任务以等待子任务 - 操作方法,还是不好的做法?

问题描述 投票:0回答:1

我正在使用

client.submit
运行任务:

from dask.distributed import Client, get_client, wait, as_completed
# other imports

zip_and_upload_futures = [ client.submit(zip_and_upload, id, path, file_list) for id, file_list in enumerate(file_lists) ]
upload_futures = []
failed_list = []

for future in as_completed(zip_and_upload_futures):
    result = future.result()
    upload_futures.append(result[0])
    print(f'Zip {result[1]} creating and added to upload queue.')
    del future

for future in as_completed(upload_futures):
    if 'finished' in future.status:
        print('Zip {future.result()[0]} uploaded.')
        del f
    elif 'exception' in future.status or 'error' in future.status:
        failed_list.append(future.exception(),future.traceback())
        del f

zip_and_upload(id, path, file_list)
中,文件被压缩到内存中的类似文件(BytesIO)对象,因为我在本地磁盘上受到限制,然后上传zip对象。

def zip_and_upload(id, path, file_list):
    client = get_client()
    zip_future = client.submit(zip_file_list, id, file_list, compression)
    wait(zip_future)
    results = zip_future.result() # tuple == (zip_data, name_list)
    future = client.submit(upload, id, results[0], results[1])
    return future, f'{path}_{id}.zip'

我从调度程序中收到很多错误,类似于

distributed.scheduler - ERROR - Couldn't gather keys: {'zip_file_list-<hash>': 'queued'}

问题:

  1. 这是 Dask 与任务和子任务的可行用法吗?
  2. 有没有更好的方法(我确信有,但是不需要太多重构并且仍然使用 Dask)(我尝试了 fire_and_forget 上传部分,但想捕获任何上传失败)?
  3. 为什么我收到错误消息说 zip_file_list 任务已排队?无论如何,我试图让该任务阻塞,所以不应该让它排队,然后执行吗?

非常感谢。

dask dask-distributed
1个回答
0
投票

如文档中所述,从任务启动任务

如果太多任务同时请求作业,可能会导致调度程序死锁。每个任务不会向调度程序传达它们正在等待结果并可以自由计算其他任务的信息。如果每个调度槽都在运行一个任务并且它们都请求更多任务,这可能会导致集群死锁。

我想这就是你所遇到的情况。我不认为你真的需要提交这些任务,特别是因为你在提交另一个任务之前正在等待第一个任务,所以也许只需删除嵌套函数中的

client.submit

 即可。

如果您确实需要这种工作流程,那么请阅读已经提到的页面:

为了避免这种死锁问题,我们可以使用分离和重新加入。这些函数将分别从集群中删除和重新加入当前任务。

© www.soinside.com 2019 - 2024. All rights reserved.