我正在使用
client.submit
运行任务:
from dask.distributed import Client, get_client, wait, as_completed
# other imports
zip_and_upload_futures = [ client.submit(zip_and_upload, id, path, file_list) for id, file_list in enumerate(file_lists) ]
upload_futures = []
failed_list = []
for future in as_completed(zip_and_upload_futures):
result = future.result()
upload_futures.append(result[0])
print(f'Zip {result[1]} creating and added to upload queue.')
del future
for future in as_completed(upload_futures):
if 'finished' in future.status:
print('Zip {future.result()[0]} uploaded.')
del f
elif 'exception' in future.status or 'error' in future.status:
failed_list.append(future.exception(),future.traceback())
del f
在
zip_and_upload(id, path, file_list)
中,文件被压缩到内存中的类似文件(BytesIO)对象,因为我在本地磁盘上受到限制,然后上传zip对象。
def zip_and_upload(id, path, file_list):
client = get_client()
zip_future = client.submit(zip_file_list, id, file_list, compression)
wait(zip_future)
results = zip_future.result() # tuple == (zip_data, name_list)
future = client.submit(upload, id, results[0], results[1])
return future, f'{path}_{id}.zip'
我从调度程序中收到很多错误,类似于
distributed.scheduler - ERROR - Couldn't gather keys: {'zip_file_list-<hash>': 'queued'}
问题:
非常感谢。
如文档中所述,从任务启动任务
如果太多任务同时请求作业,可能会导致调度程序死锁。每个任务不会向调度程序传达它们正在等待结果并可以自由计算其他任务的信息。如果每个调度槽都在运行一个任务并且它们都请求更多任务,这可能会导致集群死锁。我想这就是你所遇到的情况。我不认为你真的需要提交这些任务,特别是因为你在提交另一个任务之前正在等待第一个任务,所以也许只需删除嵌套函数中的
client.submit
即可。如果您确实需要这种工作流程,那么请阅读已经提到的页面:
为了避免这种死锁问题,我们可以使用分离和重新加入。这些函数将分别从集群中删除和重新加入当前任务。