from enum import auto, IntEnum
import getpass
import os
from pathlib import Path
import subprocess as sp
from typing import List, Union
from ezdag import Argument, Option, DAG, Layer, Node
from . import names
DEFAULT_ACCOUNTING_GROUP = "not.real"
DEFAULT_REQUEST_CPUS = 1
DEFAULT_REQUEST_MEMORY = 8192
DEFAULT_UNIVERSE = "vanilla"
DEFAULT_RETRY = 0
[docs]class JobType(IntEnum):
TRAIN = auto()
EVALUATE = auto()
CALIBRATE = auto()
TIMESERIES = auto()
REPORT = auto()
def __str__(self):
return self.name.lower()
[docs]def submit_dag(dag_path, block=True):
"""
submit the condor_dag and monitor it
return with the dag's returncode
"""
proc = sp.Popen(["condor_submit_dag", "-f", "-import_env", "-usedagdir", dag_path])
proc.wait()
returncode = proc.returncode
if returncode:
raise RuntimeError("failed to submit {}".format(dag_path))
if block:
# FIXME: is this hard-coded naming convention too fragile?
proc = sp.Popen(["condor_wait", dag_path + ".dagman.log"])
proc.wait()
returncode = proc.returncode
# note, if block=False, this will only ever return 0
return returncode
[docs]def create_batch_dag(
gps_start,
gps_end,
jobdir,
batches,
verbose=False,
quiet=False,
retry=DEFAULT_RETRY,
accounting_group=DEFAULT_ACCOUNTING_GROUP,
accounting_group_user=getpass.getuser(),
universe=DEFAULT_UNIVERSE,
**kwargs,
):
"""generates a DAG for batch jobs"""
outdir = Path(names.start_end2dir(gps_start, gps_end, rootdir=jobdir))
logdir = outdir / "logs"
os.makedirs(outdir, exist_ok=True)
dag = DAG()
requirements = {
"accounting_group": accounting_group,
"accounting_group_user": accounting_group_user,
"initialdir": os.getcwd(),
"getenv": True,
**kwargs,
}
# define job arguments
arguments: List[Union[Argument, Option]] = []
if verbose:
arguments.append(Option("verbose"))
if quiet:
arguments.append(Option("quiet"))
arguments.extend(
[
Argument("start-time", gps_start),
Argument("end-time", gps_end),
Argument("job-directory", jobdir),
]
)
layer = Layer(
name="idq-batch",
executable="idq-condor_batch",
universe=universe,
transfer_files=False,
log_dir=str(logdir),
requirements=requirements,
retries=retry,
)
for batch in batches:
layer += Node(
arguments=[
*arguments,
Argument("batch-id", batch),
]
)
dag.attach(layer)
dag_name = f"idq_batch_{gps_start}_{gps_end}"
dag.write_dag(f"{dag_name}.dag", outdir)
dag.write_script(f"{dag_name}.sh", outdir)
dag.create_log_dir(logdir)
return outdir / f"{dag_name}.dag"
[docs]def create_stream_dag(
config,
config_path,
jobdir,
initial_lookback=0,
skip_timeseries=False,
skip_report=False,
verbose=False,
quiet=False,
retry=DEFAULT_RETRY,
accounting_group=DEFAULT_ACCOUNTING_GROUP,
accounting_group_user=getpass.getuser(),
universe=DEFAULT_UNIVERSE,
**kwargs,
):
"""generates a DAG for streaming jobs"""
outdir = Path(jobdir)
logdir = outdir / "logs"
os.makedirs(outdir, exist_ok=True)
dag = DAG()
# define general job requirements
requirements = {
"accounting_group": accounting_group,
"accounting_group_user": accounting_group_user,
"initialdir": os.getcwd(),
"getenv": True,
}
# add key-value pairs not corresponding to job subsections
for k, v in kwargs.items():
if k.upper() not in JobType._member_names_:
requirements[k] = v
# define job arguments
arguments: List[Union[Argument, Option]] = [Argument("config", config_path)]
if verbose:
arguments.append(Option("verbose"))
if quiet:
arguments.append(Option("quiet"))
# main analysis jobs
for job in JobType.__members__.values():
if skip_timeseries and job == JobType.TIMESERIES:
continue
if skip_report and job == JobType.REPORT:
continue
# set job-specific requirements
if str(job) in config.condor:
job_requirements = requirements.copy() | config.condor[str(job)]
else:
job_requirements = requirements
# job arguments
job_arguments = arguments.copy()
if job == JobType.TRAIN:
job_arguments.append(Option("initial-lookback", initial_lookback))
layer = Layer(
name=f"idq-stream-{job}",
executable=f"idq-streaming_{job}",
universe=universe,
transfer_files=False,
log_dir=str(logdir),
requirements=job_requirements,
retries=retry,
)
layer += Node(arguments=job_arguments)
dag.attach(layer)
# smpull job
shm_config = config.timeseries["reporting"]
if shm_config.get("shm_pull"):
# update base requirements
smpull_requirements = requirements.copy()
smpull_requirements["request_memory"] = 2000
layer = Layer(
"idq-smpull",
universe=universe,
transfer_files=False,
log_dir=str(logdir),
requirements=smpull_requirements,
retries=retry,
)
layer += Node(
arguments=[
Argument("shm-directory", os.path.join(shm_config["shmdir"], "*.gwf")),
Argument("shm-partition", shm_config["shm_partition"]),
]
)
dag.attach(layer)
# scald job
monitor_config = config.monitor
if monitor_config.get("scald_reporting"):
scald_requirements = requirements.copy()
scald_config_path = os.path.abspath(
os.path.realpath(monitor_config["scald_config_path"])
)
layer = Layer(
name="scald_metric_collector",
executable="scald",
universe=universe,
transfer_files=False,
log_dir=str(logdir),
requirements=scald_requirements,
retries=retry,
)
arguments = [
Argument("command", "aggregate"),
Option("config", scald_config_path),
Option(
"uri",
f"kafka://{config.tag}@{config.features['timeseries']['url']}",
),
Option("data-type", "timeseries"),
]
# FIXME replace by scraping info from scald schema
# instead of hardcoding topics
topics = ["fap", "loglike", "rank", "latency", "ok"]
arguments.append(
Option(
"topic",
[f"idq.{config.tag}.{topic}" for topic in topics],
)
)
arguments.append(Option("schema", topics))
layer += Node(arguments=arguments)
dag.attach(layer)
# generate dag
dag_name = f"idq_stream_{config.tag}"
dag.write_dag(f"{dag_name}.dag", outdir)
dag.write_script(f"{dag_name}.sh", outdir)
dag.create_log_dir(logdir)
return outdir / f"{dag_name}.dag"