from concurrent import futures
import logging
import os
import numpy as np
from ligo.segments import segment, segmentlist
from . import condor
from . import configparser
from . import exceptions
from . import factories
from . import names
from . import reports
from . import utils
from .io.reporters.hdf5 import HDF5SegmentReporter
from .io.reporters.pkl import PickleReporter
from .series import ChannelInfo
DEFAULT_BATCH_NUM_BINS = 1
DEFAULT_BATCH_WORKFLOW = "block"
DEFAULT_INITIAL_LOOKBACK = 3600 # one hour
DEFAULT_NUM_SEGS_PER_BIN = 1
logger = logging.getLogger("idq")
[docs]def condor_batch(gps_start, gps_end, batchdir, nickname):
"""
run the training process
"""
# FIXME: record progress in a logger somewhere!
# read in the package prepared by idq-train or idq-straming_train
diskreporter = PickleReporter(batchdir, gps_start, gps_end)
args, kwargs = diskreporter.retrieve(names.nickname2condorname(nickname))
# execute batch
_batch(*args, **kwargs)
def _batch(
gps_start,
gps_end,
segments,
exclude_in_training,
exclude_in_else,
data_id,
config_path,
skip_timeseries=False,
skip_report=False,
):
"""
a helper function for running a single batch
"""
# run train job
logger.info("--- TRAIN ---")
train(
gps_start,
gps_end,
config_path,
segments=segments,
exclude=exclude_in_training,
data_id=data_id,
)
# run evaluate job
logger.info("--- EVALUATE ---")
evaluate(
gps_start,
gps_end,
config_path,
segments=segments,
exclude=exclude_in_else,
data_id=data_id,
)
# run calibrate job
logger.info("--- CALIBRATE ---")
calibrate(gps_start, gps_end, config_path, data_id=data_id)
# run timeseries job
if skip_timeseries:
logger.info("--- TIMESERIES : skipping ---")
else:
logger.info("--- TIMESERIES ---")
timeseries(
gps_start,
gps_end,
config_path,
segments=segments,
exclude=exclude_in_else,
data_id=data_id,
)
# run report job across all bins
if skip_report:
logger.info("--- REPORT : skipping ---")
else:
logger.info("--- REPORT ---")
report(
gps_start,
gps_end,
config_path,
segments=segments,
skip_timeseries=skip_timeseries,
)
[docs]def batch(
gps_start,
gps_end,
config_path,
workflow=DEFAULT_BATCH_WORKFLOW,
initial_lookback=DEFAULT_INITIAL_LOOKBACK,
num_bins=DEFAULT_BATCH_NUM_BINS,
num_segs_per_bin=DEFAULT_NUM_SEGS_PER_BIN,
skip_timeseries=False,
skip_report=False,
verbose=False,
quiet=False,
block=False,
causal=False,
):
"""
launch and manage all batch processes
should manage round-robin training/evaluation automatically so users really
just have to "point and click"
"""
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# set up logging
tag = config.tag
rootdir = config.rootdir
logger.info("using config : %s" % config_path)
# pull out config that will be used throughout all batch runs
batchdir = names.tag2batchdir(tag, rootdir=rootdir)
exclude_in_train = []
exclude_in_else = []
# use num_bins to partition time into separate stretches
if num_bins > 1: # automatically divide up segments for cross validation
if causal:
logger.info(
"causal round-robin training with %d bins within [%.3f, %.3f)"
% (num_bins, gps_start, gps_end)
)
else:
logger.info(
"round-robin training with %d bins and "
"%d segments per bin within [%.3f, %.3f)"
% (num_bins, num_segs_per_bin, gps_start, gps_end)
)
# we check to make sure that all the segdb decisions are consistent
# between train, evaluate, and timeseries jobs
train_ignore_segdb = config.train.get("ignore_segdb", False)
evaluate_ignore_segdb = config.evaluate.get("ignore_segdb", False)
ignore_segdb = [train_ignore_segdb, evaluate_ignore_segdb]
if not skip_timeseries:
ignore_segdb.append(config.timeseries.get("ignore_segdb", False))
if not skip_report:
ignore_segdb.append(config.report.get("ignore_segdb", False))
if all(ignore_segdb) != any(ignore_segdb):
raise exceptions.ConfigurationInconsistency(
"we must supply the same ignore_segdb option within "
"all data discovery sections for batch jobs with num_bins>1"
)
# grab segments
if "segments" in config.segments:
seg_file = config.segments["segments"]
seg_ext = os.path.splitext(seg_file)[1][1:]
reporter_factory = factories.ReporterFactory()
segreporter = reporter_factory(
batchdir, gps_start, gps_end, flavor=f"segment:{seg_ext}"
)
loaded_segs = segreporter.read(seg_file)
loaded_segs &= segmentlist([segment(gps_start, gps_end)])
else:
loaded_segs = None
segs = load_record_segments(
gps_start, gps_end, config.segments | config.train, logger
)
if causal:
logger.info("processing initial lookback for causal workflow")
initial_segs = load_record_segments(
gps_start - initial_lookback,
gps_start,
config.segments | config.train,
logger,
segments=loaded_segs,
)
else:
initial_segs = segmentlist([])
livetime = utils.livetime(initial_segs) + utils.livetime(segs)
# dsplit up the data into bins
logger.info("dividing bins equally based on livetime:")
if causal:
folds = utils.segments_causal_kfold(
segs, num_bins, initial_segs=initial_segs
)
gps_start -= initial_lookback # include this in the call to _batch
else:
folds = utils.segments_checkerboard(segs, num_bins, num_segs_per_bin)
# NOTE: we only take the union AFTER we divide up the analysis time into
# folds this is done to simplify the logic in the following loop, which
# defines segments that should be excluded based on "segs", which
# therefore must include "initial_segs" for causal training
segs |= initial_segs
# assign data into bins
for i, (trainsegs, elsesegs) in enumerate(folds, 1):
# format batch, exclude segments
# we do this backflip to make sure we only pick up the right parts
# for causal batch jobs
# exclude everything that is not in trainsegs
exclude_in_train.append(segs - trainsegs)
# exclude everything that is not in elsesegs
exclude_in_else.append(segs - elsesegs)
logger.info(
" bin %d -- train livetime: %.3f sec, else livetime: %.3f sec"
% (
i,
livetime - utils.livetime(exclude_in_train[-1]),
livetime - utils.livetime(exclude_in_else[-1]),
)
)
else: # num_bins == 1, so train and evaluate on the same stretch
bin_segment = segment(gps_start, gps_end)
segs = segmentlist([bin_segment])
logger.info("training with 1 bin within [%.3f, %.3f)" % bin_segment)
logger.warn(
"You are training and evaluating with a single bin. Be careful! This "
"can produce artificially inflated performance estimates."
)
# record stuff
exclude_in_train.append(segmentlist([]))
exclude_in_else.append(segmentlist([]))
if workflow == "block":
logger.info("processing bins sequentially")
for bin_id, (eit, eie) in enumerate(zip(exclude_in_train, exclude_in_else), 1):
_batch(
gps_start,
gps_end,
segs,
eit,
eie,
f"bin{bin_id}",
config_path,
skip_timeseries=skip_timeseries,
skip_report=skip_report,
)
elif workflow == "fork":
logger.info("processing bins in parallel")
with futures.ProcessPoolExecutor() as executor:
workers = []
for bin_id, (eit, eie) in enumerate(
zip(exclude_in_train, exclude_in_else), 1
):
worker = executor.submit(
_batch,
gps_start,
gps_end,
segs,
eit,
eie,
f"bin{bin_id}",
config_path,
skip_timeseries=skip_timeseries,
skip_report=skip_report,
)
workers.append(worker)
# wait until all bins have completed
futures.wait(workers)
elif workflow == "condor":
logger.info("processing bins in parallel via condor")
diskreporter = PickleReporter(batchdir, gps_start, gps_end)
batchnames = []
kwargs = {
"skip_timeseries": skip_timeseries,
"skip_report": skip_report,
}
for bin_id, (eit, eie) in enumerate(zip(exclude_in_train, exclude_in_else), 1):
# prepare package for idq-condor_batch
data_id = f"bin{bin_id}"
args = (gps_start, gps_end, segs, eit, eie, data_id, config_path)
# write to disk
batchname = names.id2batchname(data_id)
path = diskreporter.report(
names.nickname2condorname(batchname), (args, kwargs)
)
logger.debug("blob written to: " + path)
batchnames.append(batchname)
dag_path = condor.create_batch_dag(
gps_start,
gps_end,
batchdir,
batchnames,
verbose=verbose,
quiet=quiet,
**config.condor,
)
logger.info("dag written to {}".format(dag_path))
logger.info("condor logs located at {}".format(dag_path.parent / "logs"))
exitcode = condor.submit_dag(dag_path, block=block)
if exitcode:
raise RuntimeError("non-zero returncode for dag: {}".format(dag_path))
else:
raise ValueError("workflow=%s not understood" % workflow)
[docs]def train(
gps_start,
gps_end,
config_path,
segments=None,
exclude=None,
data_id=None,
preferred=False,
):
"""
run the training process
"""
# read in config
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(os.path.realpath(config.rootdir))
traindir = names.tag2traindir(tag, rootdir) # used to store results
for directory in [rootdir, traindir]:
os.makedirs(directory, exist_ok=True)
# set up how we record results
trainreporter = reporter_factory(
traindir, gps_start, gps_end, **config.train["reporting"]
)
# other reporters used to record intermediate data products
picklereporter = PickleReporter(traindir, gps_start, gps_end)
segmentreporter = reporter_factory(traindir, gps_start, gps_end, flavor="segment")
gpstimesreporter = reporter_factory(traindir, gps_start, gps_end, flavor="span")
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
logger.info("using classifiers: %s" % ", ".join(config.classifiers))
klassifiers = {} # funny spelling to avoid conflict with module named "classifiers"
for nickname in config.classifiers:
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
# instantiate the object and add it to the list
klassifiers[nickname] = classifier_factory(
nickname, rootdir=traindir, model_id=data_id, **items
)
# figure out how we'll read data
# grab segments
segs = load_record_segments(
gps_start,
gps_end,
config.segments | config.train,
logger,
data_id,
segments=segments,
exclude=exclude,
reporter=segmentreporter,
)
# grab data
dataloader, time = load_record_dataloader(
gps_start,
gps_end,
segs,
config.features,
logger,
data_id,
reporter=picklereporter,
)
# define truth labels for samples
target_times, random_times, _ = find_record_times(
dataloader,
time,
config.train["random_rate"],
config.samples,
logger,
data_id,
timesreporter=gpstimesreporter,
segmentreporter=segmentreporter,
)
# generate the dataset
logger.info("building dataset")
dataset = factories.DatasetFactory(dataloader).labeled(target_times, random_times)
# record dataset
datasetreporter = reporter_factory(traindir, gps_start, gps_end, flavor="dataset")
path = datasetreporter.report(names.id2nickname("dataset", data_id), dataset)
logger.debug("dataset written to " + path)
# run jobs
workflow = config.train["workflow"]
# run in series
if workflow == "block":
logger.info("training classifiers sequentially")
for nickname, classifier in klassifiers.items():
logger.info("training %s with dataset" % nickname)
model = classifier.train(dataset)
logger.info("reporting model")
path = trainreporter.report(
names.id2nickname(nickname, data_id), model, preferred=preferred
)
logger.debug("model for %s written to %s" % (nickname, path))
# record dataset with ranks
logger.info("generating evaluated dataset")
eval_dataset = classifier.evaluate(dataset)
path = datasetreporter.report(
names.id2nickname(f"{nickname}_eval_dataset", data_id), eval_dataset
)
logger.debug("evaluated dataset for %s written to %s" % (nickname, path))
# run in parallel on the same node
elif workflow == "fork":
logger.info("training classifiers in parallel")
with futures.ProcessPoolExecutor() as executor:
workers = {}
for nickname, classifier in klassifiers.items():
logger.info("training %s with dataset" % nickname)
worker = executor.submit(
_train_and_report,
classifier,
trainreporter,
datasetreporter,
dataset,
data_id=data_id,
preferred=preferred,
)
workers[worker] = nickname
# wait as all classifiers complete
for worker in futures.as_completed(workers):
nickname = workers[worker]
model_path, dataset_path = worker.result()
logger.debug("model for %s written to %s" % (nickname, model_path))
logger.debug(
"evaluated dataset for %s written to %s" % (nickname, dataset_path)
)
else:
raise ValueError("workflow=%s not understood" % (workflow))
def _train_and_report(
classifier, trainreporter, datasetreporter, dataset, data_id=None, preferred=False
):
"""
a helper function for forked workflow via multiprocessing
"""
# train and report the model
model = classifier.train(dataset)
model_path = trainreporter.report(
names.id2nickname(classifier.nickname, data_id),
model,
preferred=preferred,
)
# record dataset with ranks
eval_dataset = classifier.evaluate(dataset)
dataset_path = datasetreporter.report(
names.id2nickname(f"{classifier.nickname}_eval_dataset", data_id), eval_dataset
)
return model_path, dataset_path
[docs]def evaluate(
gps_start,
gps_end,
config_path,
segments=None,
exclude=None,
data_id=None,
preferred=False,
):
"""
evaluate the data using classifiers
"""
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(os.path.realpath(config.rootdir))
evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir)
traindir = names.tag2traindir(tag, rootdir=rootdir)
for directory in [rootdir, traindir, evaluatedir]:
os.makedirs(directory, exist_ok=True)
# set up how we record results
evaluatereporter = reporter_factory(
evaluatedir, gps_start, gps_end, **config.evaluate["reporting"]
)
trainreporter = reporter_factory(
traindir, gps_start, gps_end, **config.train["reporting"]
)
# other reporters used to record intermediate data products
picklereporter = PickleReporter(evaluatedir, gps_start, gps_end)
segmentreporter = reporter_factory(
evaluatedir, gps_start, gps_end, flavor="segment"
)
gpstimesreporter = reporter_factory(evaluatedir, gps_start, gps_end, flavor="span")
datasetreporter = reporter_factory(
evaluatedir, gps_start, gps_end, flavor="dataset"
)
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
logger.info("using classifiers: %s" % ", ".join(config.classifiers))
klassifiers = {}
for nickname in config.classifiers:
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
# instantiate classifiers and add to the list
klassifiers[nickname] = classifier_factory(
nickname, rootdir=evaluatedir, model_id=data_id, **items
)
# figure out how we'll read data
# grab segments
segs = load_record_segments(
gps_start,
gps_end,
config.segments | config.evaluate,
logger,
data_id,
segments=segments,
exclude=exclude,
reporter=segmentreporter,
)
# grab data
dataloader, time = load_record_dataloader(
gps_start,
gps_end,
segs,
config.features,
logger,
data_id,
reporter=picklereporter,
)
# generate dataset for evaluation
# define truth labels for samples
target_times, random_times, _ = find_record_times(
dataloader,
time,
config.evaluate["random_rate"],
config.samples,
logger,
data_id,
timesreporter=gpstimesreporter,
segmentreporter=segmentreporter,
)
# genrate the dataset
logger.info("building dataset")
dataset = factories.DatasetFactory(dataloader).labeled(target_times, random_times)
# record dataset
path = datasetreporter.report(names.id2nickname("dataset", data_id), dataset)
logger.debug("dataset written to " + path)
# retrieve trained models
for nickname, classifier in klassifiers.items():
logger.info("retrieving model for " + nickname)
classifier.model = trainreporter.retrieve(names.id2nickname(nickname, data_id))
# actually run jobs
workflow = config.evaluate["workflow"]
if workflow == "block":
logger.info("evaluating dataset sequentially")
# evaluate dataset and save probabilities to disk
for nickname, classifier in klassifiers.items():
logger.info("evaluating dataset with %s" % nickname)
dataset = classifier.evaluate(dataset)
logger.info("reporting dataset")
path = evaluatereporter.report(
names.id2nickname(nickname, data_id), dataset, preferred=preferred
)
logger.debug("dataset written to %s" % path)
elif workflow == "fork":
logger.info("evaluating dataset in parallel")
with futures.ProcessPoolExecutor() as executor:
workers = {}
for nickname, classifier in klassifiers.items():
logger.info("evaluating dataset with %s" % nickname)
worker = executor.submit(
_evaluate_and_report,
classifier,
evaluatereporter,
dataset,
data_id=data_id,
preferred=preferred,
)
workers[worker] = nickname
# wait as all evaluations complete
for worker in futures.as_completed(workers):
nickname = workers[worker]
path = worker.result()
logger.debug(
"evaluated dataset for %s written to %s" % (nickname, path)
)
else:
raise ValueError("workflow=%s not understood" % (workflow))
def _evaluate_and_report(classifier, reporter, dataset, data_id=None, preferred=False):
"""
a helper function for forked workflow via multiprocessing
"""
dataset = classifier.evaluate(dataset)
return reporter.report(
names.id2nickname(classifier.nickname, data_id), dataset, preferred=preferred
)
[docs]def calibrate(
gps_start,
gps_end,
config_path,
data_id=None,
preferred=False,
use_training_set=True,
):
"""
calibrate the classifier predictions based on historical data
"""
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(os.path.realpath(config.rootdir))
traindir = names.tag2traindir(tag, rootdir=rootdir)
evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir)
calibratedir = names.tag2calibratedir(tag, rootdir=rootdir)
for directory in [rootdir, traindir, calibratedir]:
os.makedirs(directory, exist_ok=True)
# set up how we record results
train_datareporter = reporter_factory(
traindir, gps_start, gps_end, flavor="dataset:hdf5"
)
evaluatereporter = reporter_factory(
evaluatedir, gps_start, gps_end, **config.evaluate["reporting"]
)
calibratereporter = reporter_factory(
calibratedir, gps_start, gps_end, **config.calibrate["reporting"]
)
# other reporters used to record intermediate data products
train_cleansegsreporter = reporter_factory(
traindir, gps_start, gps_end, flavor="segment"
)
evaluate_cleansegsreporter = reporter_factory(
evaluatedir, gps_start, gps_end, flavor="segment"
)
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
logger.info("using classifiers: %s" % ", ".join(config.classifiers))
klassifiers = {}
for nickname in config.classifiers:
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for (k, v) in items.items())
)
# instantiate the object and add it to the list
klassifiers[nickname] = classifier_factory(
nickname, rootdir=calibratedir, model_id=data_id, **items
)
# extract basic parameters for calibration
calibrate_kwargs = config.calibrate
# actually run jobs
workflow = config.calibrate["workflow"]
if workflow == "block":
logger.info("calibrating classifiers sequentially")
logger.info("retrieving clean segments")
if use_training_set:
clean_segs = train_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", data_id)
)
clean_segs |= evaluate_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", data_id)
)
else:
clean_segs = evaluate_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", data_id)
)
for nickname, classifier in klassifiers.items():
if use_training_set:
logger.info(f"retrieving evaluated datasets for {nickname}")
dataset = train_datareporter.retrieve(
names.id2nickname(f"{nickname}_eval_dataset", data_id)
)
dataset += evaluatereporter.retrieve(
names.id2nickname(nickname, data_id)
)
else:
logger.info(f"retrieving evaluated dataset for {nickname}")
dataset = evaluatereporter.retrieve(
names.id2nickname(nickname, data_id)
)
# evaluate dataset and save probabilities to disk
logger.info("calibrating evaluations with %s" % nickname)
calibmap = classifier.calibrate(
dataset, clean_segs=clean_segs, **calibrate_kwargs
)
logger.info("reporting calibration")
path = calibratereporter.report(
names.id2nickname(nickname, data_id), calibmap, preferred=preferred
)
logger.debug("calibration written to %s" % path)
elif workflow == "fork":
logger.info("calibrating classifiers in parallel")
logger.info("retrieving clean segments")
clean_segs = train_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", data_id)
)
clean_segs |= evaluate_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", data_id)
)
with futures.ProcessPoolExecutor() as executor:
workers = {}
for nickname, classifier in klassifiers.items():
logger.info(f"retrieving evaluated datasets for {nickname}")
dataset = train_datareporter.retrieve(
names.id2nickname(f"{nickname}_eval_dataset", data_id)
)
dataset += evaluatereporter.retrieve(
names.id2nickname(nickname, data_id)
)
logger.info("calibrating evaluations with %s" % nickname)
worker = executor.submit(
_calibrate_and_report,
classifier,
calibratereporter,
dataset,
clean_segs=clean_segs,
data_id=data_id,
preferred=preferred,
**calibrate_kwargs,
)
workers[worker] = nickname
# wait as all calibrations complete
for worker in futures.as_completed(workers):
nickname = workers[worker]
path = worker.result()
logger.debug("calibration for %s written to %s" % (nickname, path))
else:
raise ValueError("workflow=%s not understood" % (workflow))
def _calibrate_and_report(
classifier,
reporter,
dataset,
clean_segs=None,
data_id=None,
preferred=False,
**kwargs,
):
"""
a helper function for forked workflow via multiprocessing
"""
calibmap = classifier.calibrate(dataset, clean_segs=clean_segs, **kwargs)
return reporter.report(
names.id2nickname(classifier.nickname, data_id), calibmap, preferred=preferred
)
[docs]def timeseries(
gps_start,
gps_end,
config_path,
segments=None,
exclude=None,
data_id=None,
):
"""
evaluate the data using classifiers
"""
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(os.path.realpath(config.rootdir))
timeseriesdir = names.tag2timeseriesdir(tag, rootdir=rootdir)
calibratedir = names.tag2calibratedir(tag, rootdir=rootdir)
traindir = names.tag2traindir(tag, rootdir=rootdir)
for directory in [rootdir, traindir, calibratedir, timeseriesdir]:
os.makedirs(directory, exist_ok=True)
# extract information necessary for channel names in timeseries files
instrument = config.instrument
description = config.timeseries.get("description")
target_bounds = configparser.config2bounds(config.samples["target_bounds"])
frequency = config.features["frequency"]
assert frequency in target_bounds, (
"must specify a frequency range (called %s) within target_bounds" % frequency
)
freq_min, freq_max = target_bounds[frequency]
assert isinstance(freq_min, int) and isinstance(
freq_max, int
), "frequency bounds must be integers!"
set_ok = config.timeseries.get("set_ok")
# set up how we record results
trainreporter = reporter_factory(
traindir, gps_start, gps_end, **config.train["reporting"]
)
calibratereporter = reporter_factory(
calibratedir, gps_start, gps_end, **config.calibrate["reporting"]
)
timeseriesreporter = reporter_factory(
timeseriesdir, gps_start, gps_end, **config.timeseries["reporting"]
)
# other reporters used to record intermediate data products
picklereporter = PickleReporter(timeseriesdir, gps_start, gps_end)
segmentreporter = reporter_factory(
timeseriesdir, gps_start, gps_end, flavor="segment"
)
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
logger.info("using classifiers: %s" % ", ".join(config.classifiers))
klassifiers = {}
for nickname in config.classifiers:
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
# instantiate classifiers and add to the list
klassifiers[nickname] = classifier_factory(
nickname, rootdir=timeseriesdir, model_id=data_id, **items
)
# figure out how we'll read data
# grab segments
segs = load_record_segments(
gps_start,
gps_end,
config.segments | config.timeseries,
logger,
data_id,
segments=segments,
exclude=exclude,
reporter=segmentreporter,
)
# grab data
dataloader, _ = load_record_dataloader(
gps_start,
gps_end,
segs,
config.features,
logger,
data_id,
reporter=picklereporter,
)
dataset_factory = factories.DatasetFactory(dataloader)
# compute timeseries sampling rate -> dt
srate = config.timeseries["srate"]
logger.info("generating timeseries sampled at %.3f Hz" % srate)
dt = 1.0 / srate
# load in model and calibration map
for nickname, classifier in klassifiers.items():
# set model
logger.info("retrieving %s classifier" % nickname)
classifier.model = trainreporter.retrieve(names.id2nickname(nickname, data_id))
# set calibration map
logger.info("retrieving %s calibration_map" % nickname)
classifier.calibration_map = calibratereporter.retrieve(
names.id2nickname(nickname, data_id)
)
# actually run jobs
workflow = config.timeseries["workflow"]
# split up timeseries generation into strides is specified
max_stride = config.timeseries.get("stride", None)
if max_stride is not None:
strides = sorted(
utils.split_segments_by_stride(gps_start, gps_end, segs, max_stride)
)
else:
strides = [segment(gps_start, gps_end)]
if workflow == "block":
logger.info("generating timeseries sequentially")
for nickname, classifier in klassifiers.items():
logger.info(f"generating timeseries for {nickname}")
for stride in strides:
# update report times
timeseriesreporter.start, timeseriesreporter.end = stride
stride_segs = segmentlist([stride]) & segs
lvtm = utils.livetime(stride_segs)
# create timeseries if stride livetime is nonzero
if lvtm:
logger.info(
"generating timeseries for stride: [%.3f, %.3f)" % stride
)
# generate series
logger.debug("generating timeseries")
info = ChannelInfo(
instrument,
freq_min,
freq_max,
nickname,
description=description,
)
series = classifier.timeseries(
info, dataset_factory, dt=dt, segs=stride_segs, set_ok=set_ok
)
# report
logger.debug("reporting timeseries")
for path in report_timeseries(timeseriesreporter, info, series):
logger.debug("timeseries written to %s" % path)
logger.info(f"generated timeseries for {nickname}")
elif workflow == "fork":
logger.info("generating timeseries in parallel")
with futures.ProcessPoolExecutor() as executor:
workers = {}
for nickname, classifier in klassifiers.items():
info = ChannelInfo(
instrument, freq_min, freq_max, nickname, description=description
)
worker = executor.submit(
_timeseries_and_report,
classifier,
timeseriesreporter,
instrument,
dataset_factory,
dt,
strides,
segs,
info,
set_ok=set_ok,
)
workers[worker] = nickname
# wait as all timeseries are generated
for worker in futures.as_completed(workers):
nickname = workers[worker]
paths = worker.result()
logger.info(f"generated timeseries for {nickname}")
for path in paths:
logger.debug("timeseries for %s written to %s" % (nickname, path))
else:
raise ValueError("workflow=%s not understood" % (workflow))
def report_timeseries(reporter, info, all_series):
paths = []
for series in all_series:
reporter.start = series.start
reporter.end = series.end
paths.append(reporter.report(info.file_nickname, series))
return paths
def _timeseries_and_report(
classifier,
reporter,
instrument,
dataset_factory,
dt,
strides,
segments,
info,
set_ok=None,
):
"""
a helper function for forked workflow via multiprocessing
"""
all_paths = []
for stride in strides:
# update report times
reporter.start, reporter.end = stride
stride_segs = segmentlist([stride]) & segments
lvtm = utils.livetime(stride_segs)
# create timeseries if stride livetime is nonzero
if lvtm:
series = classifier.timeseries(
info, dataset_factory, dt, stride_segs, set_ok=set_ok
)
paths = report_timeseries(reporter, info.file_nickname, series)
all_paths.extend(paths)
return all_paths
[docs]def report(
gps_start,
gps_end,
config_path,
reportdir=None,
zoom_start=None,
zoom_end=None,
segments=None,
**kwargs,
):
"""
generate a report of the classifier predictions in this period we allow
reportdir to be specified directly here in case you want to make a summary
of a job running in a directory you don't own
"""
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
if reportdir is not None:
pass
elif "reportdir" in config.report:
reportdir = config.report["reportdir"]
else:
reportdir = names.tag2reportdir(config.tag, rootdir=config.rootdir)
os.makedirs(reportdir, exist_ok=True)
# grab segments
segs = load_record_segments(
gps_start,
gps_end,
config.segments | config.report,
logger,
segments=segments,
)
# record segments
# do this backflip so segment reporter doesn't create subdirectories...
segmentreporter = HDF5SegmentReporter(reportdir, gps_start, gps_end)
path = os.path.join(
reportdir,
names.start_end2path(
"segments", gps_start, gps_end, suffix=segmentreporter.suffix
),
)
segmentreporter._write(path, segs)
logger.info("segments written to " + path)
# make the report
report_obj = reports.Report(
config_path,
gps_start,
gps_end,
segments=segs,
t0=gps_start,
zoom_start=zoom_start,
zoom_end=zoom_end,
skip_timeseries=kwargs.get("skip_timeseries", False),
)
report_obj.report(config.classifiers, reportdir=reportdir)
[docs]def find_record_times(
dataloader,
time,
random_rate,
samples_config,
logger,
data_id=None,
timesreporter=None,
segmentreporter=None,
):
"""
find truth labels (target and random times) for samples, optionally record
to disk
"""
# NOTE: if we switch to continuous weighting of samples instead of discrete
# labels, this whole block will need to be re-written...
target_channel = samples_config["target_channel"]
target_bounds = configparser.config2bounds(samples_config["target_bounds"])
dirty_bounds = configparser.config2bounds(samples_config["dirty_bounds"])
dirty_window = samples_config["dirty_window"]
logger.info("identifying target times")
try:
if samples_config.get("target_times_path"):
target_times = np.genfromtxt(samples_config["target_times_path"])
in_segments = utils.times_in_segments(target_times, dataloader.segs)
target_times = target_times[in_segments]
else:
target_times = dataloader.target_times(time, target_channel, target_bounds)
except exceptions.NoDataError:
logger.error(
"identified 0 target times, this may indicate data discovery issues"
)
raise
else:
logger.info("identified %d target_times" % len(target_times))
# record target times
if timesreporter:
path = timesreporter.report(
names.id2nickname("targetTimes", data_id), target_times
)
logger.debug("target_times written to " + path)
# draw random times
random_times, clean_segs = dataloader.random_times(
time, target_channel, dirty_bounds, dirty_window, random_rate
)
logger.info("identified %d random times" % len(random_times))
# record random times
if timesreporter:
path = timesreporter.report(
names.id2nickname("randomTimes", data_id), random_times
)
logger.debug("random_times written to " + path)
# record clean segments
if segmentreporter:
path = segmentreporter.report(
names.id2nickname("cleanSegments", data_id), clean_segs
)
logger.debug("clean segments written to " + path)
return target_times, random_times, clean_segs
[docs]def load_record_dataloader(
start, end, segments, config, logger, data_id=None, reporter=None, group=None
):
"""
load classifier data and optionally write to disk
"""
dataloader_factory = factories.DataLoaderFactory()
# set up options
items = config.copy()
logger.info("using feature backend: %s" % config["flavor"])
logger.debug(
"dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in config.items())
)
time = items.pop("time") # extract the name of the column we use as time
if group:
items = {**items, "group": group}
# set up dataloader
dataloader = dataloader_factory(start, end, segs=segments, **items)
# record dataloader
if reporter:
path = reporter.report(names.id2nickname("classifierData", data_id), dataloader)
logger.debug("dataloader written to " + path)
return dataloader, time
[docs]def load_record_segments(
start,
end,
config,
logger,
data_id=None,
segments=None,
exclude=None,
reporter=None,
):
"""
load in segments and optionally write to disk
"""
if not exclude:
exclude = segmentlist([])
if config.get("ignore_segdb", False):
logger.info("ignoring segdb")
segs = segmentlist([segment(start, end)])
elif segments is not None: # segments provided, no need to query segdb
logger.info("segments provided ahead of time, skipping query to segdb")
segs = segmentlist(segments)
else:
segdb_url = config["segdb_url"]
if "replay" in config:
offset = config["replay"]["offset"]
start = start - offset
end = end - offset
logger.info(
f"replay mode enabled, shifting all dqsegdb query times by {offset}s"
)
logger.info(
"querying %s within [%.3f, %.3f) for: union=%s ; intersect=%s ; exclude=%s"
% (
segdb_url,
start,
end,
",".join(config.get("union", [])),
",".join(config.get("intersect", [])),
",".join(config.get("exclude", [])),
)
)
segs = utils.segdb2segs(
start,
end,
union=config.get("union"),
intersect=config.get("intersect"),
exclude=config.get("exclude"),
segdb_url=segdb_url,
)
# remove any times specified on the command line
logger.debug("excluding segments : " + str(exclude))
segs -= exclude
logger.info("retained %.3f sec of livetime" % utils.livetime(segs))
# shift segments if replay mode enabled (only if queried)
if not segments and "replay" in config:
offset = config["replay"]["offset"]
segs.shift(offset)
# record segments
if reporter:
path = reporter.report(names.id2nickname("segments", data_id), segs)
logger.debug("segments written to " + path)
return segs