Source code for idq.condor

from enum import auto, IntEnum
import getpass
import os
from pathlib import Path
import subprocess as sp
from typing import List, Union

from ezdag import Argument, Option, DAG, Layer, Node

from . import names


DEFAULT_ACCOUNTING_GROUP = "not.real"
DEFAULT_REQUEST_CPUS = 1
DEFAULT_REQUEST_MEMORY = 8192
DEFAULT_UNIVERSE = "vanilla"
DEFAULT_RETRY = 0


[docs]class JobType(IntEnum): TRAIN = auto() EVALUATE = auto() CALIBRATE = auto() TIMESERIES = auto() REPORT = auto() def __str__(self): return self.name.lower()
[docs]def submit_dag(dag_path, block=True): """ submit the condor_dag and monitor it return with the dag's returncode """ proc = sp.Popen(["condor_submit_dag", "-f", "-import_env", "-usedagdir", dag_path]) proc.wait() returncode = proc.returncode if returncode: raise RuntimeError("failed to submit {}".format(dag_path)) if block: # FIXME: is this hard-coded naming convention too fragile? proc = sp.Popen(["condor_wait", dag_path + ".dagman.log"]) proc.wait() returncode = proc.returncode # note, if block=False, this will only ever return 0 return returncode
[docs]def create_batch_dag( gps_start, gps_end, jobdir, batches, verbose=False, quiet=False, retry=DEFAULT_RETRY, accounting_group=DEFAULT_ACCOUNTING_GROUP, accounting_group_user=getpass.getuser(), universe=DEFAULT_UNIVERSE, **kwargs, ): """generates a DAG for batch jobs""" outdir = Path(names.start_end2dir(gps_start, gps_end, rootdir=jobdir)) logdir = outdir / "logs" os.makedirs(outdir, exist_ok=True) dag = DAG() requirements = { "accounting_group": accounting_group, "accounting_group_user": accounting_group_user, "initialdir": os.getcwd(), "getenv": True, **kwargs, } # define job arguments arguments: List[Union[Argument, Option]] = [] if verbose: arguments.append(Option("verbose")) if quiet: arguments.append(Option("quiet")) arguments.extend( [ Argument("start-time", gps_start), Argument("end-time", gps_end), Argument("job-directory", jobdir), ] ) layer = Layer( name="idq-batch", executable="idq-condor_batch", universe=universe, transfer_files=False, log_dir=str(logdir), requirements=requirements, retries=retry, ) for batch in batches: layer += Node( arguments=[ *arguments, Argument("batch-id", batch), ] ) dag.attach(layer) dag_name = f"idq_batch_{gps_start}_{gps_end}" dag.write_dag(f"{dag_name}.dag", outdir) dag.write_script(f"{dag_name}.sh", outdir) dag.create_log_dir(logdir) return outdir / f"{dag_name}.dag"
[docs]def create_stream_dag( config, config_path, jobdir, initial_lookback=0, skip_timeseries=False, skip_report=False, verbose=False, quiet=False, retry=DEFAULT_RETRY, accounting_group=DEFAULT_ACCOUNTING_GROUP, accounting_group_user=getpass.getuser(), universe=DEFAULT_UNIVERSE, **kwargs, ): """generates a DAG for streaming jobs""" outdir = Path(jobdir) logdir = outdir / "logs" os.makedirs(outdir, exist_ok=True) dag = DAG() # define general job requirements requirements = { "accounting_group": accounting_group, "accounting_group_user": accounting_group_user, "initialdir": os.getcwd(), "getenv": True, } # add key-value pairs not corresponding to job subsections for k, v in kwargs.items(): if k.upper() not in JobType._member_names_: requirements[k] = v # define job arguments arguments: List[Union[Argument, Option]] = [Argument("config", config_path)] if verbose: arguments.append(Option("verbose")) if quiet: arguments.append(Option("quiet")) # main analysis jobs for job in JobType.__members__.values(): if skip_timeseries and job == JobType.TIMESERIES: continue if skip_report and job == JobType.REPORT: continue # set job-specific requirements if str(job) in config.condor: job_requirements = requirements.copy() | config.condor[str(job)] else: job_requirements = requirements # job arguments job_arguments = arguments.copy() if job == JobType.TRAIN: job_arguments.append(Option("initial-lookback", initial_lookback)) layer = Layer( name=f"idq-stream-{job}", executable=f"idq-streaming_{job}", universe=universe, transfer_files=False, log_dir=str(logdir), requirements=job_requirements, retries=retry, ) layer += Node(arguments=job_arguments) dag.attach(layer) # smpull job shm_config = config.timeseries["reporting"] if shm_config.get("shm_pull"): # update base requirements smpull_requirements = requirements.copy() smpull_requirements["request_memory"] = 2000 layer = Layer( "idq-smpull", universe=universe, transfer_files=False, log_dir=str(logdir), requirements=smpull_requirements, retries=retry, ) layer += Node( arguments=[ Argument("shm-directory", os.path.join(shm_config["shmdir"], "*.gwf")), Argument("shm-partition", shm_config["shm_partition"]), ] ) dag.attach(layer) # scald job monitor_config = config.monitor if monitor_config.get("scald_reporting"): scald_requirements = requirements.copy() scald_config_path = os.path.abspath( os.path.realpath(monitor_config["scald_config_path"]) ) layer = Layer( name="scald_metric_collector", executable="scald", universe=universe, transfer_files=False, log_dir=str(logdir), requirements=scald_requirements, retries=retry, ) arguments = [ Argument("command", "aggregate"), Option("config", scald_config_path), Option( "uri", f"kafka://{config.tag}@{config.features['timeseries']['url']}", ), Option("data-type", "timeseries"), ] # FIXME replace by scraping info from scald schema # instead of hardcoding topics topics = ["fap", "loglike", "rank", "latency", "ok"] arguments.append( Option( "topic", [f"idq.{config.tag}.{topic}" for topic in topics], ) ) arguments.append(Option("schema", topics)) layer += Node(arguments=arguments) dag.attach(layer) # generate dag dag_name = f"idq_stream_{config.tag}" dag.write_dag(f"{dag_name}.dag", outdir) dag.write_script(f"{dag_name}.sh", outdir) dag.create_log_dir(logdir) return outdir / f"{dag_name}.dag"