Source code for bilby_pipe.htcondor_sync

"""
bilby_pipe_htcondor_sync can be used to sync jobs running under HTCondor where
transfer-files=True (i.e. they do not use a virtual file system). It will use
rsync to copy the results from remote worker nodes or spool back to the execute
node.  The executable is assumed to be run from the submit directory (i.e.,
where the .ini file is) and on the submit machine (i.e. where the job was
submitted from).
"""
import argparse
import glob
import os
import subprocess

from bilby_pipe.utils import logger


[docs] def get_cluster_id_list(outdir): """Get a list of cluster IDs for every running analysis job""" logfile_matches = glob.glob(f"{outdir}/log_data_analysis/*.log") ids = [] for logfile in logfile_matches: ids.append(get_cluster_id(logfile)) return ids
[docs] def get_cluster_id(logfile): """Read a log files to determine the latest cluster ID Extract the HTCondor cluster ID from the .log file. For example, if the log file reads ``` 001 (100503183.000.000) 2022-03-07 15:22:49 Job executing on host: <10.14.4.164...> ``` Then this function return the cluster ID 100503183 Parameters ---------- logfile: str A path to a HTCondor log file Returns ------- cluster_id: str The cluster ID. If not ID is found, None is returned and a log message is printed. """ with open(logfile, "r") as f: ids = [] for line in f: if "Job executing on" in line: elements = line.split() ids.append(int(elements[1].lstrip("(").rstrip(")").split(".")[0])) if len(ids) > 0: return ids[-1] else: logger.info("No cluster ID found in log file")
[docs] def rsync_via_ssh(cluster_id, outdir, verbose=False, timeout=30): """Attempt to rsync the local (submit) directory to current running worker nodes This method applies when the job is actively executing on a remote worker node and condor_ssh_to_job is possible. The method works by using condor_ssh_to_job and rsync as described in the HTCondor documentation: https://htcondor.readthedocs.io/en/latest/man-pages/condor_ssh_to_job.html. Parameters ---------- cluster_id: int The HTCondor clusterId outdir: str The top-level outdir of the bilby_pipe job verbose: bool If true, print explicit error messages timeout: int The timeout interval Returns ------- success: bool True if the method was successful. """ sync_path = f"{outdir}/result/" target = f"{cluster_id}:{sync_path}" cmd = [ "timeout", str(timeout), "rsync", "-v", "-r", "-e", '"condor_ssh_to_job"', target, sync_path, ] logger.info("Running " + " ".join(cmd)) out = subprocess.run(cmd, capture_output=True) if verbose: logger.info(f"stdout: {out.stdout.decode('utf-8')}") logger.info(f"stderr: {out.stderr.decode('utf-8')}") if out.returncode == 0: return True else: return False
[docs] def rsync_via_spool(cluster_id, outdir, verbose=False): """Attempt to rsync the local (submit) directory to the spool This method applies when the job is not actively executing on a remote worker, but is idle. In this instance, any files produced by the job will be stored in the spool (a local directory on the submit machine). This methods identifies the spool location, based on the cluster_id, and attempts to rsync the data. Parameters ---------- cluster_id: int The HTCondor clusterId outdir: str The top-level outdir of the bilby_pipe job Returns ------- success: bool True if the method was successful. """ outdir = outdir.rstrip("/") subdir = cluster_id % 10000 procid = 0 spool_dir = ( subprocess.check_output("condor_config_val SPOOL", shell=True) .decode("utf-8") .rstrip("\n") ) # Definition of the spool location credit to James Clark src = f"{spool_dir}/{subdir}/{procid}/cluster{cluster_id}.proc{procid}.subproc0/{outdir}/" if os.path.isdir(src): cmd = ["rsync", "-r", src, outdir] logger.info("Running " + " ".join(cmd)) out = subprocess.run(cmd, capture_output=True) if verbose: logger.info(f"stdout: {out.stdout.decode('utf-8')}") logger.info(f"stderr: {out.stderr.decode('utf-8')}") return True else: if verbose: logger.info(f"Spool directory {src} does not exist") return False
[docs] methods = [rsync_via_ssh, rsync_via_spool]
[docs] def main(): parser = argparse.ArgumentParser(description=__doc__) parser.add_argument("outdir", help="The bilby_pipe directory to sync") parser.add_argument( "--verbose", action="store_true", help="Print explicit error messages" ) args = parser.parse_args() args.outdir = args.outdir.rstrip("/") cluster_id_list = get_cluster_id_list(args.outdir) for cluster_id in cluster_id_list: if cluster_id is not None: logger.info(f"Trying to sync job {cluster_id}") success = False for method in methods: logger.info(f"Trying to sync using method {method.__name__}") success = method(cluster_id, args.outdir, args.verbose) if success: logger.info(f"Successfully synced using method {method.__name__}") break else: logger.info(f"Failed to sync using method {method.__name__}") if success is False: logger.warning(f"All sync methods failed for job {cluster_id}")
if __name__ == "__main__": main()