我正在编写一个python函数来使用dask进行数据处理。如果出现任何错误或异常,我想自动关闭 dask 集群和客户端。所以我使用
with ... as:
语句。函数结构为:
def func(input:str, # path to input
output:str, # path to output
):
with LocalCluster() as cluster, Client(cluster) as client:
# load the input with dask
# set up computing graph
da.compute([...])
如果我在处理数据时发出键盘中断,则集群和客户端成功关闭,即
da.compute()
。但是,当程序正在设置集群时,即调用LocalCluster()
时,当我进行中断时,集群没有成功关闭。我会得到如下的东西:
......
KeyboardInterrupt:
2023-10-20 18:55:38,520 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,523 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,529 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,532 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,535 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,548 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,549 - distributed.nanny - WARNING - Restarting worker
2023-10-20 18:55:38,556 - distributed.nanny - WARNING - Restarting worker
工人没有停下来。
所以,下次当我再次运行这个函数时,我会得到:
/users/kangl/miniconda3/envs/work/lib/python3.10/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.
Perhaps you already have a cluster running?
Hosting the HTTP server on port 37963 instead
warnings.warn(
因为之前的集群没有关闭。
有什么方法可以在创建LocalCluster过程中发生中断时自动关闭集群吗?
我非常确定这需要对分布式进行深入的代码更改才能实现。
with
的工作方式是,只有在 LocalCluster()
完成后才会设置上下文,因此,如果您在此之前中断,则还没有可以安全退出的上下文。
您可以尝试的一件事是
cluster = LocalCluster(n_workers=0, ...)
with cluster:
cluster.scale(...)
# cluster.wait_for_workers(...) # if you need it
with Client(cluster) as client:
compute(...)
现在慢线发生在上下文中。