#!/usr/bin/env python
"""
Module containing the tools for outputting slurm submission scripts
"""
import os
import subprocess
from ..utils import logger
[docs]
class SubmitSLURM(object):
def __init__(self, dag):
[docs]
self.dag = dag.pycondor_dag
[docs]
self.submit_dir = dag.inputs.submit_directory
[docs]
self.submit = dag.inputs.submit
[docs]
self.label = dag.inputs.label
[docs]
self.scheduler = dag.scheduler
[docs]
self.scheduler_args = dag.scheduler_args
[docs]
self.scheduler_module = dag.scheduler_module
[docs]
self.scheduler_env = dag.scheduler_env
[docs]
self.scheduler_analysis_time = dag.scheduler_analysis_time
[docs]
def run_local_generation(self):
for node in self.dag.nodes:
if "_generation" in node.name:
# Run the job locally
cmd = " ".join([node.executable, node.args[0].arg])
subprocess.run(cmd, shell=True)
# Remove the children
for other_node in self.dag.nodes:
if node in other_node.parents:
other_node.parents.remove(node)
self.dag.nodes.remove(node)
[docs]
def write_master_slurm(self):
"""
Translate dag content to SLURM script
"""
slurm_args_master = {
"mem": "1G",
"nodes": 1,
"ntasks-per-node": 1,
"time": "00:10:00",
"output": f"{self.submit_dir}/{self.label}_master_slurm.out",
"error": f"{self.submit_dir}/{self.label}_master_slurm.err",
"job-name": f"{self.label}_master",
}
# reformat slurm options
if self.scheduler_args is not None:
slurm_args_custom = {
arg.split("=")[0]: arg.split("=")[1]
for arg in self.scheduler_args.split()
}
slurm_args_master.update(slurm_args_custom)
else:
slurm_args_custom = {}
with open(self.slurm_master_bash, "w") as f:
f.write("#!/bin/bash\n")
for key, val in slurm_args_master.items():
f.write(f"#SBATCH --{key}={val}\n")
if self.scheduler_module:
for module in self.scheduler_module:
if module is not None:
f.write(f"\nmodule load {module}\n")
# if self.scheduler_env is not None:
# f.write("\nsource {}\n".format(self.scheduler_env))
# assign new job ID to each process
jids = range(len(self.dag.nodes))
job_names = [node.name for node in self.dag.nodes]
# create dict for assigning ID to job name
job_dict = dict(zip(job_names, jids))
for node, indx in zip(self.dag.nodes, jids):
# get output file path from dag and use for slurm
output_file = self._output_name_from_dag(node.extra_lines)
# Generate the real slurm arguments from the dag node and the parsed slurm args
job_slurm_args = {
"mem": f"{int(float(node.request_memory.rstrip('GB')))}G",
"nodes": 1,
"ntasks-per-node": node.request_cpus,
"time": node.slurm_walltime,
"output": output_file,
"error": output_file.replace(".out", ".err"),
"job-name": node.name,
}
job_slurm_args.update(slurm_args_custom)
# build the submit string for this node
submit_str = f"\njid{indx}=($(sbatch"
for key, val in job_slurm_args.items():
submit_str += f" --{key}={val}"
# get list of all parents associated with job
parents = [job.name for job in node.parents]
if len(parents) > 0:
# only run subsequent jobs after parent has
# *successfully* completed
submit_str += " --dependency=afterok"
for parent in parents:
submit_str += f":${{jid{job_dict[parent]}[-1]}}"
job_script = self._write_individual_processes(
node.name, node.executable, node.args[0].arg
)
submit_str += f" {job_script}))\n\n"
submit_str += (
f'echo "jid{indx} ${{jid{indx}[-1]}}" >> {self.slurm_id_file}'
)
f.write(f"{submit_str}\n")
# print out how to submit
command_line = f"sbatch {self.slurm_master_bash}"
if self.submit:
subprocess.run([command_line], shell=True)
else:
logger.info(f"slurm scripts written, to run jobs submit:\n$ {command_line}")
[docs]
def _write_individual_processes(self, name, executable, args):
fname = name + ".sh"
job_path = self.submit_dir + "/" + fname
with open(job_path, "w") as ff:
ff.write("#!/bin/bash\n")
if self.scheduler_module:
for module in self.scheduler_module:
if module is not None:
ff.write(f"\nmodule load {module}\n")
if self.scheduler_env is not None:
ff.write(f"\nsource {self.scheduler_env}\n\n")
# Call python from the venv on the script directly to avoid
# "bad interpreter" from shebang exceeding 128 chars
job_str = f"python {executable} {args}\n\n"
else:
job_str = f"{executable} {args}\n\n"
ff.write(job_str)
return job_path
@property
[docs]
def slurm_master_bash(self):
"""
Create filename for master script
"""
filebasename = "_".join(["slurm", self.label, "master.sh"])
return os.path.join(self.submit_dir, filebasename)
@property
[docs]
def slurm_id_file(self):
"""
Create the file that should store the slurm ids of the jobs
"""
return os.path.join(self.submit_dir, "slurm_ids")
@staticmethod
[docs]
def _output_name_from_dag(extra_lines):
# probably a faster way to do this, but the list is short so this should be fine
for i in range(len(extra_lines)):
if extra_lines[i].startswith("output"):
path = extra_lines[i][9:]
path = path.replace("_$(Cluster)", "")
path = path.replace("_$(Process)", "")
return path