我有一个PBS作业脚本,其中包含一个可将结果写入out文件的可执行文件。
### some lines
PBS_O_EXEDIR="path/to/software"
EXECUTABLE="executablefile"
OUTFILE="out"
### Copy application directory on compute node
[ -d $PBS_O_EXEDIR ] || mkdir -p $PBS_O_EXEDIR
[ -w $PBS_O_EXEDIR ] && \
rsync -Cavz --rsh=$SSH $HOST:$PBS_O_EXEDIR `dirname $PBS_O_EXEDIR`
[ -d $PBS_O_WORKDIR ] || mkdir -p $PBS_O_WORKDIR
rsync -Cavz --rsh=$SSH $HOST:$PBS_O_WORKDIR `dirname $PBS_O_WORKDIR`
# Change into the working directory
cd $PBS_O_WORKDIR
# Save the jobid in the outfile
echo "PBS-JOB-ID was $PBS_JOBID" > $OUTFILE
# Run the executable
$PBS_O_EXEDIR/$EXECUTABLE >> $OUTFILE
在我的项目中,我必须使用Dask提交此作业并对其进行监视。因此,我已经这样配置了jobqueue.yaml文件。
jobqueue:
pbs:
name: htc_calc
# Dask worker options
cores: 4 # Total number of cores per job
memory: 50GB # Total amount of memory per job
# PBS resource manager options
shebang: "#!/usr/bin/env bash"
walltime: '00:30:00'
exe_dir: "/home/r/rb11/softwares/FPLO/bin"
excutable: "fplo18.00-57-x86_64"
outfile: "out"
job-extra: "exe_dir/executable >> outfile"
但是,通过Dask提交作业时出现此错误。
qsub: directive error: e
tornado.application - ERROR - Exception in callback functools.partial(<function wrap.<locals>.null_wrapper at 0x7f3d8c4a56a8>, <Task finished coro=<SpecCluster._correct_state_internal() done, defined at /home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/distributed/deploy/spec.py:284> exception=RuntimeError('Command exited with non-zero exit code.\nExit code: 1\nCommand:\nqsub /tmp/tmpwyvkfcmi.sh\nstdout:\n\nstderr:\nqsub: directive error: e \n\n',)>)
Traceback (most recent call last):
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/tornado/ioloop.py", line 758, in _run_callback
ret = callback()
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/tornado/stack_context.py", line 300, in null_wrapper
return fn(*args, **kwargs)
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/tornado/ioloop.py", line 779, in _discard_future_result
future.result()
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/asyncio/futures.py", line 294, in result
raise self._exception
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/asyncio/tasks.py", line 240, in _step
result = coro.send(None)
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/distributed/deploy/spec.py", line 317, in _correct_state_internal
await w # for tornado gen.coroutine support
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/distributed/deploy/spec.py", line 41, in _
await self.start()
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/dask_jobqueue/core.py", line 285, in start
out = await self._submit_job(fn)
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/dask_jobqueue/core.py", line 268, in _submit_job
return self._call(shlex.split(self.submit_command) + [script_filename])
File "/home/r/rb11/anaconda3/envs/htc/lib/python3.5/site-packages/dask_jobqueue/core.py", line 368, in _call
"stderr:\n{}\n".format(proc.returncode, cmd_str, out, err)
RuntimeError: Command exited with non-zero exit code.
Exit code: 1
Command:
qsub /tmp/tmpwyvkfcmi.sh
stdout:
stderr:
qsub: directive error: e
如何在Dask中指定自定义bash脚本?
Dask用于分发Python应用程序。对于Dask Jobqueue,它的工作原理是将调度程序和工作程序提交给批处理系统,批处理系统连接在一起形成自己的集群。然后,您可以将Python工作提交到Dask调度程序。
从您的示例看来,您正在尝试使用集群设置配置来运行自己的bash应用程序而不是Dask。
为了使用Dask进行此操作,您应该将jobqueue配置恢复为默认值,而是编写一个Python函数来调用bash脚本。
from dask_jobqueue import PBSCluster
cluster = PBSCluster()
cluster.scale(jobs=10) # Deploy ten single-node jobs
from dask.distributed import Client
client = Client(cluster) # Connect this local process to remote workers
client.submit(os.system, "/path/to/your/script") # Run script on all workers
但是似乎Dask可能不适合您尝试做的事情。通常将您的工作提交给PBS,可能会更好。