我有以下代码遍历每个项目目录,调用外部可执行文件并将结果写入out *文件。
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=3)
from dask.distributed import Client
client = Client(cluster)
...
r_path='/path/to/project/folder'
def func():
f = open('out', 'w') # In project, customized out_file naming based on different dir's
(subprocess.call(["/path/to/executable/file"], stdout=f))
for root, dirs, files in os.walk("."):
for name in dirs:
os.chdir(r_path+'/'+str(name))
func()
此代码以顺序方式执行,但是我想并行运行它,即,在一个简单的工作程序上调用每个子流程。
注意:对于所有不同的目录,我都有相同的subprocess.call(相同的可执行文件)
我已经尝试过]
def func(): f = open('out', 'w') # In project, customized out_file naming based on different dir's func.file = (subprocess.call(["/path/to/executable/file"], stdout=f)) arg = [func.file] workers = client.scheduler_info()['workers'] tasks = [client.submit(func, arg, workers=worker) for arg, worker in zip(args, workers)]
还有这个(可能不使用dask来分发/并行化)
def func(): f = open('out', 'w') with io.open(f, mode='wb') as out: p = subprocess.Popen(["/path/to/executable/file"], stdout=out, stderr=out) child_processes.append(p) for cp in child_processes: cp.wait()
但无法并行化/分配子流程调用。
有人可以帮我并行化此子流程的过程,每个子流程需要1名工作人员,以便可以更快地执行任务。
提前感谢!
我有以下代码遍历每个项目目录,调用一个外部可执行文件并将结果写入out *文件。从dask_jobqueue导入PBSCluster群集= PBSCluster()...
通常,第一次非黄昏尝试会显示easiest pattern to parallelise。但是,我要警告不要将全局状态与os.chdir
一起使用-而是通过完整路径引用输出文件,并将工作目录传递给子进程
r_path='/path/to/project/folder'
def func(path):
f = open(os.path.join(path, 'out'), 'w')
subprocess.call(["/path/to/executable/file"], stdout=f, cwd=path)
out = []
for root, dirs, files in os.walk("."):
for name in dirs:
path = r_path+'/'+str(name)
out.append(dask.delayed(func)(path))
client.compute(*out)