[docs]defrun_local_generation(self):fornodeinself.dag.nodes:if"_generation"innode.name:# Run the job locallycmd=" ".join([node.executable,node.args[0].arg])subprocess.run(cmd,shell=True)# Remove the childrenforother_nodeinself.dag.nodes:ifnodeinother_node.parents:other_node.parents.remove(node)self.dag.nodes.remove(node)
[docs]defwrite_master_slurm(self):""" Translate dag content to SLURM script """withopen(self.slurm_master_bash,"w")asf:# reformat slurm optionsifself.scheduler_argsisnotNone:slurm_args=" ".join([f"--{arg}"forarginself.scheduler_args.split()])else:slurm_args=""f.write("#!/bin/bash\n")forarginslurm_args.split():f.write(f"#SBATCH {arg}\n")f.write("#SBATCH --time=00:10:00\n")# write output to standard filef.write(f"#SBATCH --output={self.submit_dir}/{self.label}_master_slurm.out\n")f.write(f"#SBATCH --error={self.submit_dir}/{self.label}_master_slurm.err\n")ifself.scheduler_module:formoduleinself.scheduler_module:ifmoduleisnotNone:f.write(f"\nmodule load {module}\n")# if self.scheduler_env is not None:# f.write("\nsource {}\n".format(self.scheduler_env))# assign new job ID to each processjids=range(len(self.dag.nodes))job_names=[node.namefornodeinself.dag.nodes]# create dict for assigning ID to job namejob_dict=dict(zip(job_names,jids))fornode,indxinzip(self.dag.nodes,jids):# Generate the real slurm arguments from the dag node and the parsed slurm argsjob_slurm_args=slurm_argsjob_slurm_args+=" --nodes=1"job_slurm_args+=f" --ntasks-per-node={node.request_cpus}"job_slurm_args+=(f" --mem={int(float(node.request_memory.rstrip('GB')))}G")job_slurm_args+=f" --time={node.slurm_walltime}"job_slurm_args+=f" --job-name={node.name}"submit_str=f"\njid{indx}=($(sbatch {job_slurm_args} "# get list of all parents associated with jobparents=[job.nameforjobinnode.parents]iflen(parents)>0:# only run subsequent jobs after parent has# *successfully* completedsubmit_str+="--dependency=afterok"forparentinparents:submit_str+=f":${{jid{job_dict[parent]}[-1]}}"# get output file path from dag and use for slurmoutput_file=self._output_name_from_dag(node.extra_lines)submit_str+=f" --output={output_file}"submit_str+=f" --error={output_file.replace('.out','.err')}"job_script=self._write_individual_processes(node.name,node.executable,node.args[0].arg)submit_str+=f" {job_script}))\n\n"submit_str+=(f'echo "jid{indx} ${{jid{indx}[-1]}}" >> {self.slurm_id_file}')f.write(f"{submit_str}\n")# print out how to submitcommand_line=f"sbatch {self.slurm_master_bash}"ifself.submit:subprocess.run([command_line],shell=True)else:logger.info(f"slurm scripts written, to run jobs submit:\n$ {command_line}")
[docs]def_write_individual_processes(self,name,executable,args):fname=name+".sh"job_path=self.submit_dir+"/"+fnamewithopen(job_path,"w")asff:ff.write("#!/bin/bash\n")ifself.scheduler_module:formoduleinself.scheduler_module:ifmoduleisnotNone:ff.write(f"\nmodule load {module}\n")ifself.scheduler_envisnotNone:ff.write(f"\nsource {self.scheduler_env}\n\n")# Call python from the venv on the script directly to avoid# "bad interpreter" from shebang exceeding 128 charsjob_str=f"python {executable}{args}\n\n"else:job_str=f"{executable}{args}\n\n"ff.write(job_str)returnjob_path
@property
[docs]defslurm_master_bash(self):""" Create filename for master script """filebasename="_".join(["slurm",self.label,"master.sh"])returnos.path.join(self.submit_dir,filebasename)
@property
[docs]defslurm_id_file(self):""" Create the file that should store the slurm ids of the jobs """returnos.path.join(self.submit_dir,"slurm_ids")
@staticmethod
[docs]def_output_name_from_dag(extra_lines):# probably a faster way to do this, but the list is short so this should be fineforiinrange(len(extra_lines)):ifextra_lines[i].startswith("output"):path=extra_lines[i][9:]path=path.replace("_$(Cluster)","")path=path.replace("_$(Process)","")returnpath