Source code for idq.batch

from concurrent import futures
import logging
import os

import numpy as np

from ligo.segments import segment, segmentlist

from . import condor
from . import configparser
from . import exceptions
from . import factories
from . import names
from . import reports
from . import utils
from .io.reporters.hdf5 import HDF5SegmentReporter
from .io.reporters.pkl import PickleReporter
from .series import ChannelInfo


DEFAULT_BATCH_NUM_BINS = 1
DEFAULT_BATCH_WORKFLOW = "block"
DEFAULT_INITIAL_LOOKBACK = 3600  # one hour
DEFAULT_NUM_SEGS_PER_BIN = 1


logger = logging.getLogger("idq")


[docs]def condor_batch(gps_start, gps_end, batchdir, nickname): """ run the training process """ # FIXME: record progress in a logger somewhere! # read in the package prepared by idq-train or idq-straming_train diskreporter = PickleReporter(batchdir, gps_start, gps_end) args, kwargs = diskreporter.retrieve(names.nickname2condorname(nickname)) # execute batch _batch(*args, **kwargs)
def _batch( gps_start, gps_end, segments, exclude_in_training, exclude_in_else, data_id, config_path, skip_timeseries=False, skip_report=False, ): """ a helper function for running a single batch """ # run train job logger.info("--- TRAIN ---") train( gps_start, gps_end, config_path, segments=segments, exclude=exclude_in_training, data_id=data_id, ) # run evaluate job logger.info("--- EVALUATE ---") evaluate( gps_start, gps_end, config_path, segments=segments, exclude=exclude_in_else, data_id=data_id, ) # run calibrate job logger.info("--- CALIBRATE ---") calibrate(gps_start, gps_end, config_path, data_id=data_id) # run timeseries job if skip_timeseries: logger.info("--- TIMESERIES : skipping ---") else: logger.info("--- TIMESERIES ---") timeseries( gps_start, gps_end, config_path, segments=segments, exclude=exclude_in_else, data_id=data_id, ) # run report job across all bins if skip_report: logger.info("--- REPORT : skipping ---") else: logger.info("--- REPORT ---") report( gps_start, gps_end, config_path, segments=segments, skip_timeseries=skip_timeseries, )
[docs]def batch( gps_start, gps_end, config_path, workflow=DEFAULT_BATCH_WORKFLOW, initial_lookback=DEFAULT_INITIAL_LOOKBACK, num_bins=DEFAULT_BATCH_NUM_BINS, num_segs_per_bin=DEFAULT_NUM_SEGS_PER_BIN, skip_timeseries=False, skip_report=False, verbose=False, quiet=False, block=False, causal=False, ): """ launch and manage all batch processes should manage round-robin training/evaluation automatically so users really just have to "point and click" """ config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # set up logging tag = config.tag rootdir = config.rootdir logger.info("using config : %s" % config_path) # pull out config that will be used throughout all batch runs batchdir = names.tag2batchdir(tag, rootdir=rootdir) exclude_in_train = [] exclude_in_else = [] # use num_bins to partition time into separate stretches if num_bins > 1: # automatically divide up segments for cross validation if causal: logger.info( "causal round-robin training with %d bins within [%.3f, %.3f)" % (num_bins, gps_start, gps_end) ) else: logger.info( "round-robin training with %d bins and " "%d segments per bin within [%.3f, %.3f)" % (num_bins, num_segs_per_bin, gps_start, gps_end) ) # we check to make sure that all the segdb decisions are consistent # between train, evaluate, and timeseries jobs train_ignore_segdb = config.train.get("ignore_segdb", False) evaluate_ignore_segdb = config.evaluate.get("ignore_segdb", False) ignore_segdb = [train_ignore_segdb, evaluate_ignore_segdb] if not skip_timeseries: ignore_segdb.append(config.timeseries.get("ignore_segdb", False)) if not skip_report: ignore_segdb.append(config.report.get("ignore_segdb", False)) if all(ignore_segdb) != any(ignore_segdb): raise exceptions.ConfigurationInconsistency( "we must supply the same ignore_segdb option within " "all data discovery sections for batch jobs with num_bins>1" ) # grab segments if "segments" in config.segments: seg_file = config.segments["segments"] seg_ext = os.path.splitext(seg_file)[1][1:] reporter_factory = factories.ReporterFactory() segreporter = reporter_factory( batchdir, gps_start, gps_end, flavor=f"segment:{seg_ext}" ) loaded_segs = segreporter.read(seg_file) loaded_segs &= segmentlist([segment(gps_start, gps_end)]) else: loaded_segs = None segs = load_record_segments( gps_start, gps_end, config.segments | config.train, logger ) if causal: logger.info("processing initial lookback for causal workflow") initial_segs = load_record_segments( gps_start - initial_lookback, gps_start, config.segments | config.train, logger, segments=loaded_segs, ) else: initial_segs = segmentlist([]) livetime = utils.livetime(initial_segs) + utils.livetime(segs) # dsplit up the data into bins logger.info("dividing bins equally based on livetime:") if causal: folds = utils.segments_causal_kfold( segs, num_bins, initial_segs=initial_segs ) gps_start -= initial_lookback # include this in the call to _batch else: folds = utils.segments_checkerboard(segs, num_bins, num_segs_per_bin) # NOTE: we only take the union AFTER we divide up the analysis time into # folds this is done to simplify the logic in the following loop, which # defines segments that should be excluded based on "segs", which # therefore must include "initial_segs" for causal training segs |= initial_segs # assign data into bins for i, (trainsegs, elsesegs) in enumerate(folds, 1): # format batch, exclude segments # we do this backflip to make sure we only pick up the right parts # for causal batch jobs # exclude everything that is not in trainsegs exclude_in_train.append(segs - trainsegs) # exclude everything that is not in elsesegs exclude_in_else.append(segs - elsesegs) logger.info( " bin %d -- train livetime: %.3f sec, else livetime: %.3f sec" % ( i, livetime - utils.livetime(exclude_in_train[-1]), livetime - utils.livetime(exclude_in_else[-1]), ) ) else: # num_bins == 1, so train and evaluate on the same stretch bin_segment = segment(gps_start, gps_end) segs = segmentlist([bin_segment]) logger.info("training with 1 bin within [%.3f, %.3f)" % bin_segment) logger.warn( "You are training and evaluating with a single bin. Be careful! This " "can produce artificially inflated performance estimates." ) # record stuff exclude_in_train.append(segmentlist([])) exclude_in_else.append(segmentlist([])) if workflow == "block": logger.info("processing bins sequentially") for bin_id, (eit, eie) in enumerate(zip(exclude_in_train, exclude_in_else), 1): _batch( gps_start, gps_end, segs, eit, eie, f"bin{bin_id}", config_path, skip_timeseries=skip_timeseries, skip_report=skip_report, ) elif workflow == "fork": logger.info("processing bins in parallel") with futures.ProcessPoolExecutor() as executor: workers = [] for bin_id, (eit, eie) in enumerate( zip(exclude_in_train, exclude_in_else), 1 ): worker = executor.submit( _batch, gps_start, gps_end, segs, eit, eie, f"bin{bin_id}", config_path, skip_timeseries=skip_timeseries, skip_report=skip_report, ) workers.append(worker) # wait until all bins have completed futures.wait(workers) elif workflow == "condor": logger.info("processing bins in parallel via condor") diskreporter = PickleReporter(batchdir, gps_start, gps_end) batchnames = [] kwargs = { "skip_timeseries": skip_timeseries, "skip_report": skip_report, } for bin_id, (eit, eie) in enumerate(zip(exclude_in_train, exclude_in_else), 1): # prepare package for idq-condor_batch data_id = f"bin{bin_id}" args = (gps_start, gps_end, segs, eit, eie, data_id, config_path) # write to disk batchname = names.id2batchname(data_id) path = diskreporter.report( names.nickname2condorname(batchname), (args, kwargs) ) logger.debug("blob written to: " + path) batchnames.append(batchname) dag_path = condor.create_batch_dag( gps_start, gps_end, batchdir, batchnames, verbose=verbose, quiet=quiet, **config.condor, ) logger.info("dag written to {}".format(dag_path)) logger.info("condor logs located at {}".format(dag_path.parent / "logs")) exitcode = condor.submit_dag(dag_path, block=block) if exitcode: raise RuntimeError("non-zero returncode for dag: {}".format(dag_path)) else: raise ValueError("workflow=%s not understood" % workflow)
[docs]def train( gps_start, gps_end, config_path, segments=None, exclude=None, data_id=None, preferred=False, ): """ run the training process """ # read in config logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(os.path.realpath(config.rootdir)) traindir = names.tag2traindir(tag, rootdir) # used to store results for directory in [rootdir, traindir]: os.makedirs(directory, exist_ok=True) # set up how we record results trainreporter = reporter_factory( traindir, gps_start, gps_end, **config.train["reporting"] ) # other reporters used to record intermediate data products picklereporter = PickleReporter(traindir, gps_start, gps_end) segmentreporter = reporter_factory(traindir, gps_start, gps_end, flavor="segment") gpstimesreporter = reporter_factory(traindir, gps_start, gps_end, flavor="span") # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run logger.info("using classifiers: %s" % ", ".join(config.classifiers)) klassifiers = {} # funny spelling to avoid conflict with module named "classifiers" for nickname in config.classifiers: items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) # instantiate the object and add it to the list klassifiers[nickname] = classifier_factory( nickname, rootdir=traindir, model_id=data_id, **items ) # figure out how we'll read data # grab segments segs = load_record_segments( gps_start, gps_end, config.segments | config.train, logger, data_id, segments=segments, exclude=exclude, reporter=segmentreporter, ) # grab data dataloader, time = load_record_dataloader( gps_start, gps_end, segs, config.features, logger, data_id, reporter=picklereporter, ) # define truth labels for samples target_times, random_times, _ = find_record_times( dataloader, time, config.train["random_rate"], config.samples, logger, data_id, timesreporter=gpstimesreporter, segmentreporter=segmentreporter, ) # generate the dataset logger.info("building dataset") dataset = factories.DatasetFactory(dataloader).labeled(target_times, random_times) # record dataset datasetreporter = reporter_factory(traindir, gps_start, gps_end, flavor="dataset") path = datasetreporter.report(names.id2nickname("dataset", data_id), dataset) logger.debug("dataset written to " + path) # run jobs workflow = config.train["workflow"] # run in series if workflow == "block": logger.info("training classifiers sequentially") for nickname, classifier in klassifiers.items(): logger.info("training %s with dataset" % nickname) model = classifier.train(dataset) logger.info("reporting model") path = trainreporter.report( names.id2nickname(nickname, data_id), model, preferred=preferred ) logger.debug("model for %s written to %s" % (nickname, path)) # record dataset with ranks logger.info("generating evaluated dataset") eval_dataset = classifier.evaluate(dataset) path = datasetreporter.report( names.id2nickname(f"{nickname}_eval_dataset", data_id), eval_dataset ) logger.debug("evaluated dataset for %s written to %s" % (nickname, path)) # run in parallel on the same node elif workflow == "fork": logger.info("training classifiers in parallel") with futures.ProcessPoolExecutor() as executor: workers = {} for nickname, classifier in klassifiers.items(): logger.info("training %s with dataset" % nickname) worker = executor.submit( _train_and_report, classifier, trainreporter, datasetreporter, dataset, data_id=data_id, preferred=preferred, ) workers[worker] = nickname # wait as all classifiers complete for worker in futures.as_completed(workers): nickname = workers[worker] model_path, dataset_path = worker.result() logger.debug("model for %s written to %s" % (nickname, model_path)) logger.debug( "evaluated dataset for %s written to %s" % (nickname, dataset_path) ) else: raise ValueError("workflow=%s not understood" % (workflow))
def _train_and_report( classifier, trainreporter, datasetreporter, dataset, data_id=None, preferred=False ): """ a helper function for forked workflow via multiprocessing """ # train and report the model model = classifier.train(dataset) model_path = trainreporter.report( names.id2nickname(classifier.nickname, data_id), model, preferred=preferred, ) # record dataset with ranks eval_dataset = classifier.evaluate(dataset) dataset_path = datasetreporter.report( names.id2nickname(f"{classifier.nickname}_eval_dataset", data_id), eval_dataset ) return model_path, dataset_path
[docs]def evaluate( gps_start, gps_end, config_path, segments=None, exclude=None, data_id=None, preferred=False, ): """ evaluate the data using classifiers """ logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(os.path.realpath(config.rootdir)) evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir) traindir = names.tag2traindir(tag, rootdir=rootdir) for directory in [rootdir, traindir, evaluatedir]: os.makedirs(directory, exist_ok=True) # set up how we record results evaluatereporter = reporter_factory( evaluatedir, gps_start, gps_end, **config.evaluate["reporting"] ) trainreporter = reporter_factory( traindir, gps_start, gps_end, **config.train["reporting"] ) # other reporters used to record intermediate data products picklereporter = PickleReporter(evaluatedir, gps_start, gps_end) segmentreporter = reporter_factory( evaluatedir, gps_start, gps_end, flavor="segment" ) gpstimesreporter = reporter_factory(evaluatedir, gps_start, gps_end, flavor="span") datasetreporter = reporter_factory( evaluatedir, gps_start, gps_end, flavor="dataset" ) # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run logger.info("using classifiers: %s" % ", ".join(config.classifiers)) klassifiers = {} for nickname in config.classifiers: items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) # instantiate classifiers and add to the list klassifiers[nickname] = classifier_factory( nickname, rootdir=evaluatedir, model_id=data_id, **items ) # figure out how we'll read data # grab segments segs = load_record_segments( gps_start, gps_end, config.segments | config.evaluate, logger, data_id, segments=segments, exclude=exclude, reporter=segmentreporter, ) # grab data dataloader, time = load_record_dataloader( gps_start, gps_end, segs, config.features, logger, data_id, reporter=picklereporter, ) # generate dataset for evaluation # define truth labels for samples target_times, random_times, _ = find_record_times( dataloader, time, config.evaluate["random_rate"], config.samples, logger, data_id, timesreporter=gpstimesreporter, segmentreporter=segmentreporter, ) # genrate the dataset logger.info("building dataset") dataset = factories.DatasetFactory(dataloader).labeled(target_times, random_times) # record dataset path = datasetreporter.report(names.id2nickname("dataset", data_id), dataset) logger.debug("dataset written to " + path) # retrieve trained models for nickname, classifier in klassifiers.items(): logger.info("retrieving model for " + nickname) classifier.model = trainreporter.retrieve(names.id2nickname(nickname, data_id)) # actually run jobs workflow = config.evaluate["workflow"] if workflow == "block": logger.info("evaluating dataset sequentially") # evaluate dataset and save probabilities to disk for nickname, classifier in klassifiers.items(): logger.info("evaluating dataset with %s" % nickname) dataset = classifier.evaluate(dataset) logger.info("reporting dataset") path = evaluatereporter.report( names.id2nickname(nickname, data_id), dataset, preferred=preferred ) logger.debug("dataset written to %s" % path) elif workflow == "fork": logger.info("evaluating dataset in parallel") with futures.ProcessPoolExecutor() as executor: workers = {} for nickname, classifier in klassifiers.items(): logger.info("evaluating dataset with %s" % nickname) worker = executor.submit( _evaluate_and_report, classifier, evaluatereporter, dataset, data_id=data_id, preferred=preferred, ) workers[worker] = nickname # wait as all evaluations complete for worker in futures.as_completed(workers): nickname = workers[worker] path = worker.result() logger.debug( "evaluated dataset for %s written to %s" % (nickname, path) ) else: raise ValueError("workflow=%s not understood" % (workflow))
def _evaluate_and_report(classifier, reporter, dataset, data_id=None, preferred=False): """ a helper function for forked workflow via multiprocessing """ dataset = classifier.evaluate(dataset) return reporter.report( names.id2nickname(classifier.nickname, data_id), dataset, preferred=preferred )
[docs]def calibrate( gps_start, gps_end, config_path, data_id=None, preferred=False, use_training_set=True, ): """ calibrate the classifier predictions based on historical data """ logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(os.path.realpath(config.rootdir)) traindir = names.tag2traindir(tag, rootdir=rootdir) evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir) calibratedir = names.tag2calibratedir(tag, rootdir=rootdir) for directory in [rootdir, traindir, calibratedir]: os.makedirs(directory, exist_ok=True) # set up how we record results train_datareporter = reporter_factory( traindir, gps_start, gps_end, flavor="dataset:hdf5" ) evaluatereporter = reporter_factory( evaluatedir, gps_start, gps_end, **config.evaluate["reporting"] ) calibratereporter = reporter_factory( calibratedir, gps_start, gps_end, **config.calibrate["reporting"] ) # other reporters used to record intermediate data products train_cleansegsreporter = reporter_factory( traindir, gps_start, gps_end, flavor="segment" ) evaluate_cleansegsreporter = reporter_factory( evaluatedir, gps_start, gps_end, flavor="segment" ) # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run logger.info("using classifiers: %s" % ", ".join(config.classifiers)) klassifiers = {} for nickname in config.classifiers: items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for (k, v) in items.items()) ) # instantiate the object and add it to the list klassifiers[nickname] = classifier_factory( nickname, rootdir=calibratedir, model_id=data_id, **items ) # extract basic parameters for calibration calibrate_kwargs = config.calibrate # actually run jobs workflow = config.calibrate["workflow"] if workflow == "block": logger.info("calibrating classifiers sequentially") logger.info("retrieving clean segments") if use_training_set: clean_segs = train_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", data_id) ) clean_segs |= evaluate_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", data_id) ) else: clean_segs = evaluate_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", data_id) ) for nickname, classifier in klassifiers.items(): if use_training_set: logger.info(f"retrieving evaluated datasets for {nickname}") dataset = train_datareporter.retrieve( names.id2nickname(f"{nickname}_eval_dataset", data_id) ) dataset += evaluatereporter.retrieve( names.id2nickname(nickname, data_id) ) else: logger.info(f"retrieving evaluated dataset for {nickname}") dataset = evaluatereporter.retrieve( names.id2nickname(nickname, data_id) ) # evaluate dataset and save probabilities to disk logger.info("calibrating evaluations with %s" % nickname) calibmap = classifier.calibrate( dataset, clean_segs=clean_segs, **calibrate_kwargs ) logger.info("reporting calibration") path = calibratereporter.report( names.id2nickname(nickname, data_id), calibmap, preferred=preferred ) logger.debug("calibration written to %s" % path) elif workflow == "fork": logger.info("calibrating classifiers in parallel") logger.info("retrieving clean segments") clean_segs = train_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", data_id) ) clean_segs |= evaluate_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", data_id) ) with futures.ProcessPoolExecutor() as executor: workers = {} for nickname, classifier in klassifiers.items(): logger.info(f"retrieving evaluated datasets for {nickname}") dataset = train_datareporter.retrieve( names.id2nickname(f"{nickname}_eval_dataset", data_id) ) dataset += evaluatereporter.retrieve( names.id2nickname(nickname, data_id) ) logger.info("calibrating evaluations with %s" % nickname) worker = executor.submit( _calibrate_and_report, classifier, calibratereporter, dataset, clean_segs=clean_segs, data_id=data_id, preferred=preferred, **calibrate_kwargs, ) workers[worker] = nickname # wait as all calibrations complete for worker in futures.as_completed(workers): nickname = workers[worker] path = worker.result() logger.debug("calibration for %s written to %s" % (nickname, path)) else: raise ValueError("workflow=%s not understood" % (workflow))
def _calibrate_and_report( classifier, reporter, dataset, clean_segs=None, data_id=None, preferred=False, **kwargs, ): """ a helper function for forked workflow via multiprocessing """ calibmap = classifier.calibrate(dataset, clean_segs=clean_segs, **kwargs) return reporter.report( names.id2nickname(classifier.nickname, data_id), calibmap, preferred=preferred )
[docs]def timeseries( gps_start, gps_end, config_path, segments=None, exclude=None, data_id=None, ): """ evaluate the data using classifiers """ logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(os.path.realpath(config.rootdir)) timeseriesdir = names.tag2timeseriesdir(tag, rootdir=rootdir) calibratedir = names.tag2calibratedir(tag, rootdir=rootdir) traindir = names.tag2traindir(tag, rootdir=rootdir) for directory in [rootdir, traindir, calibratedir, timeseriesdir]: os.makedirs(directory, exist_ok=True) # extract information necessary for channel names in timeseries files instrument = config.instrument description = config.timeseries.get("description") target_bounds = configparser.config2bounds(config.samples["target_bounds"]) frequency = config.features["frequency"] assert frequency in target_bounds, ( "must specify a frequency range (called %s) within target_bounds" % frequency ) freq_min, freq_max = target_bounds[frequency] assert isinstance(freq_min, int) and isinstance( freq_max, int ), "frequency bounds must be integers!" set_ok = config.timeseries.get("set_ok") # set up how we record results trainreporter = reporter_factory( traindir, gps_start, gps_end, **config.train["reporting"] ) calibratereporter = reporter_factory( calibratedir, gps_start, gps_end, **config.calibrate["reporting"] ) timeseriesreporter = reporter_factory( timeseriesdir, gps_start, gps_end, **config.timeseries["reporting"] ) # other reporters used to record intermediate data products picklereporter = PickleReporter(timeseriesdir, gps_start, gps_end) segmentreporter = reporter_factory( timeseriesdir, gps_start, gps_end, flavor="segment" ) # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run logger.info("using classifiers: %s" % ", ".join(config.classifiers)) klassifiers = {} for nickname in config.classifiers: items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) # instantiate classifiers and add to the list klassifiers[nickname] = classifier_factory( nickname, rootdir=timeseriesdir, model_id=data_id, **items ) # figure out how we'll read data # grab segments segs = load_record_segments( gps_start, gps_end, config.segments | config.timeseries, logger, data_id, segments=segments, exclude=exclude, reporter=segmentreporter, ) # grab data dataloader, _ = load_record_dataloader( gps_start, gps_end, segs, config.features, logger, data_id, reporter=picklereporter, ) dataset_factory = factories.DatasetFactory(dataloader) # compute timeseries sampling rate -> dt srate = config.timeseries["srate"] logger.info("generating timeseries sampled at %.3f Hz" % srate) dt = 1.0 / srate # load in model and calibration map for nickname, classifier in klassifiers.items(): # set model logger.info("retrieving %s classifier" % nickname) classifier.model = trainreporter.retrieve(names.id2nickname(nickname, data_id)) # set calibration map logger.info("retrieving %s calibration_map" % nickname) classifier.calibration_map = calibratereporter.retrieve( names.id2nickname(nickname, data_id) ) # actually run jobs workflow = config.timeseries["workflow"] # split up timeseries generation into strides is specified max_stride = config.timeseries.get("stride", None) if max_stride is not None: strides = sorted( utils.split_segments_by_stride(gps_start, gps_end, segs, max_stride) ) else: strides = [segment(gps_start, gps_end)] if workflow == "block": logger.info("generating timeseries sequentially") for nickname, classifier in klassifiers.items(): logger.info(f"generating timeseries for {nickname}") for stride in strides: # update report times timeseriesreporter.start, timeseriesreporter.end = stride stride_segs = segmentlist([stride]) & segs lvtm = utils.livetime(stride_segs) # create timeseries if stride livetime is nonzero if lvtm: logger.info( "generating timeseries for stride: [%.3f, %.3f)" % stride ) # generate series logger.debug("generating timeseries") info = ChannelInfo( instrument, freq_min, freq_max, nickname, description=description, ) series = classifier.timeseries( info, dataset_factory, dt=dt, segs=stride_segs, set_ok=set_ok ) # report logger.debug("reporting timeseries") for path in report_timeseries(timeseriesreporter, info, series): logger.debug("timeseries written to %s" % path) logger.info(f"generated timeseries for {nickname}") elif workflow == "fork": logger.info("generating timeseries in parallel") with futures.ProcessPoolExecutor() as executor: workers = {} for nickname, classifier in klassifiers.items(): info = ChannelInfo( instrument, freq_min, freq_max, nickname, description=description ) worker = executor.submit( _timeseries_and_report, classifier, timeseriesreporter, instrument, dataset_factory, dt, strides, segs, info, set_ok=set_ok, ) workers[worker] = nickname # wait as all timeseries are generated for worker in futures.as_completed(workers): nickname = workers[worker] paths = worker.result() logger.info(f"generated timeseries for {nickname}") for path in paths: logger.debug("timeseries for %s written to %s" % (nickname, path)) else: raise ValueError("workflow=%s not understood" % (workflow))
def report_timeseries(reporter, info, all_series): paths = [] for series in all_series: reporter.start = series.start reporter.end = series.end paths.append(reporter.report(info.file_nickname, series)) return paths def _timeseries_and_report( classifier, reporter, instrument, dataset_factory, dt, strides, segments, info, set_ok=None, ): """ a helper function for forked workflow via multiprocessing """ all_paths = [] for stride in strides: # update report times reporter.start, reporter.end = stride stride_segs = segmentlist([stride]) & segments lvtm = utils.livetime(stride_segs) # create timeseries if stride livetime is nonzero if lvtm: series = classifier.timeseries( info, dataset_factory, dt, stride_segs, set_ok=set_ok ) paths = report_timeseries(reporter, info.file_nickname, series) all_paths.extend(paths) return all_paths
[docs]def report( gps_start, gps_end, config_path, reportdir=None, zoom_start=None, zoom_end=None, segments=None, **kwargs, ): """ generate a report of the classifier predictions in this period we allow reportdir to be specified directly here in case you want to make a summary of a job running in a directory you don't own """ config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) if reportdir is not None: pass elif "reportdir" in config.report: reportdir = config.report["reportdir"] else: reportdir = names.tag2reportdir(config.tag, rootdir=config.rootdir) os.makedirs(reportdir, exist_ok=True) # grab segments segs = load_record_segments( gps_start, gps_end, config.segments | config.report, logger, segments=segments, ) # record segments # do this backflip so segment reporter doesn't create subdirectories... segmentreporter = HDF5SegmentReporter(reportdir, gps_start, gps_end) path = os.path.join( reportdir, names.start_end2path( "segments", gps_start, gps_end, suffix=segmentreporter.suffix ), ) segmentreporter._write(path, segs) logger.info("segments written to " + path) # make the report report_obj = reports.Report( config_path, gps_start, gps_end, segments=segs, t0=gps_start, zoom_start=zoom_start, zoom_end=zoom_end, skip_timeseries=kwargs.get("skip_timeseries", False), ) report_obj.report(config.classifiers, reportdir=reportdir)
[docs]def find_record_times( dataloader, time, random_rate, samples_config, logger, data_id=None, timesreporter=None, segmentreporter=None, ): """ find truth labels (target and random times) for samples, optionally record to disk """ # NOTE: if we switch to continuous weighting of samples instead of discrete # labels, this whole block will need to be re-written... target_channel = samples_config["target_channel"] target_bounds = configparser.config2bounds(samples_config["target_bounds"]) dirty_bounds = configparser.config2bounds(samples_config["dirty_bounds"]) dirty_window = samples_config["dirty_window"] logger.info("identifying target times") try: if samples_config.get("target_times_path"): target_times = np.genfromtxt(samples_config["target_times_path"]) in_segments = utils.times_in_segments(target_times, dataloader.segs) target_times = target_times[in_segments] else: target_times = dataloader.target_times(time, target_channel, target_bounds) except exceptions.NoDataError: logger.error( "identified 0 target times, this may indicate data discovery issues" ) raise else: logger.info("identified %d target_times" % len(target_times)) # record target times if timesreporter: path = timesreporter.report( names.id2nickname("targetTimes", data_id), target_times ) logger.debug("target_times written to " + path) # draw random times random_times, clean_segs = dataloader.random_times( time, target_channel, dirty_bounds, dirty_window, random_rate ) logger.info("identified %d random times" % len(random_times)) # record random times if timesreporter: path = timesreporter.report( names.id2nickname("randomTimes", data_id), random_times ) logger.debug("random_times written to " + path) # record clean segments if segmentreporter: path = segmentreporter.report( names.id2nickname("cleanSegments", data_id), clean_segs ) logger.debug("clean segments written to " + path) return target_times, random_times, clean_segs
[docs]def load_record_dataloader( start, end, segments, config, logger, data_id=None, reporter=None, group=None ): """ load classifier data and optionally write to disk """ dataloader_factory = factories.DataLoaderFactory() # set up options items = config.copy() logger.info("using feature backend: %s" % config["flavor"]) logger.debug( "dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in config.items()) ) time = items.pop("time") # extract the name of the column we use as time if group: items = {**items, "group": group} # set up dataloader dataloader = dataloader_factory(start, end, segs=segments, **items) # record dataloader if reporter: path = reporter.report(names.id2nickname("classifierData", data_id), dataloader) logger.debug("dataloader written to " + path) return dataloader, time
[docs]def load_record_segments( start, end, config, logger, data_id=None, segments=None, exclude=None, reporter=None, ): """ load in segments and optionally write to disk """ if not exclude: exclude = segmentlist([]) if config.get("ignore_segdb", False): logger.info("ignoring segdb") segs = segmentlist([segment(start, end)]) elif segments is not None: # segments provided, no need to query segdb logger.info("segments provided ahead of time, skipping query to segdb") segs = segmentlist(segments) else: segdb_url = config["segdb_url"] if "replay" in config: offset = config["replay"]["offset"] start = start - offset end = end - offset logger.info( f"replay mode enabled, shifting all dqsegdb query times by {offset}s" ) logger.info( "querying %s within [%.3f, %.3f) for: union=%s ; intersect=%s ; exclude=%s" % ( segdb_url, start, end, ",".join(config.get("union", [])), ",".join(config.get("intersect", [])), ",".join(config.get("exclude", [])), ) ) segs = utils.segdb2segs( start, end, union=config.get("union"), intersect=config.get("intersect"), exclude=config.get("exclude"), segdb_url=segdb_url, ) # remove any times specified on the command line logger.debug("excluding segments : " + str(exclude)) segs -= exclude logger.info("retained %.3f sec of livetime" % utils.livetime(segs)) # shift segments if replay mode enabled (only if queried) if not segments and "replay" in config: offset = config["replay"]["offset"] segs.shift(offset) # record segments if reporter: path = reporter.report(names.id2nickname("segments", data_id), segs) logger.debug("segments written to " + path) return segs