在一个工作人员中以并行方式分发/分发子流程调用?

问题描述 投票:0回答:1

我有以下代码遍历每个项目目录,调用外部可执行文件并将结果写入out *文件。

from dask_jobqueue import PBSCluster   
cluster = PBSCluster()
cluster.scale(jobs=3)  

from dask.distributed import Client
client = Client(cluster)
...

r_path='/path/to/project/folder'


def func():
    f = open('out', 'w') # In project, customized out_file naming based on different dir's
    (subprocess.call(["/path/to/executable/file"], stdout=f))

for root, dirs, files in os.walk("."):
    for name in dirs:
        os.chdir(r_path+'/'+str(name))
        func()

此代码以顺序方式执行,但是我想并行运行它,即,在一个简单的工作程序上调用每个子流程。

注意:对于所有不同的目录,我都有相同的subprocess.call(相同的可执行文件)

我已经尝试过]

def func():
   f = open('out', 'w') # In project, customized out_file naming based on different dir's
   func.file = (subprocess.call(["/path/to/executable/file"], stdout=f))


arg = [func.file]
workers = client.scheduler_info()['workers']
tasks = [client.submit(func, arg, workers=worker) for arg, worker in zip(args, workers)]

还有这个(可能不使用dask来分发/并行化)

def func():
    f = open('out', 'w')
    with io.open(f, mode='wb') as out:
        p = subprocess.Popen(["/path/to/executable/file"], stdout=out, stderr=out)
        child_processes.append(p)

for cp in child_processes:
    cp.wait()

但无法并行化/分配子流程调用。

有人可以帮我并行化此子流程的过程,每个子流程需要1名工作人员,以便可以更快地执行任务。

提前感谢!

我有以下代码遍历每个项目目录,调用一个外部可执行文件并将结果写入out *文件。从dask_jobqueue导入PBSCluster群集= PBSCluster()...

python subprocess dask
1个回答
0
投票

通常,第一次非黄昏尝试会显示easiest pattern to parallelise。但是,我要警告不要将全局状态与os.chdir一起使用-而是通过完整路径引用输出文件,并将工作目录传递给子进程

r_path='/path/to/project/folder'

def func(path):
    f = open(os.path.join(path, 'out'), 'w')
    subprocess.call(["/path/to/executable/file"], stdout=f, cwd=path)

out = []
for root, dirs, files in os.walk("."):
    for name in dirs:
        path = r_path+'/'+str(name)
        out.append(dask.delayed(func)(path))

client.compute(*out)
© www.soinside.com 2019 - 2024. All rights reserved.