Source code for idq.stream

from collections import deque
import logging
import multiprocessing as mp
import os

import numpy as np

import gpstime
from ligo.scald.io import kafka
from ligo.segments import segment, segmentlist

from . import batch
from . import calibration
from . import classifiers
from . import condor
from . import configparser
from . import factories
from . import features
from . import names
from . import utils
from . import exceptions
from .series import ChannelInfo


DEFAULT_STREAM_WORKFLOW = "fork"
DEFAULT_INITIAL_LOOKBACK = 3600  # one hour

# set to be +infty
DEFAULT_GPS_END = 2000000000

DEFAULT_MIN_NEW_SAMPLES = 1
DEFAULT_MAX_SAMPLES = calibration.DEFAULT_MAX_SAMPLES
DEFAULT_MAX_STRIDE = calibration.DEFAULT_MAX_STRIDE

DEFAULT_PLOT_STRIDE = 10
DEFAULT_MAX_METRIC_SPAN = 60  # one minute
DEFAULT_MAX_LATENCY_SPAN = 4 * 3600  # four hours

logger = logging.getLogger("idq")


[docs]def gps_range(gps_start=None, gps_end=None): """ figure out the GPS range for a streaming analysis """ if gps_start is None: gps_start = int(gpstime.gpsnow()) if gps_end is None: gps_end = DEFAULT_GPS_END assert gps_end > gps_start, "gps_end (%d) must be > gps_start (%d)!" % ( gps_end, gps_start, ) return gps_start, gps_end
[docs]def stream( config_path, gps_start=None, gps_end=None, workflow=DEFAULT_STREAM_WORKFLOW, initial_lookback=DEFAULT_INITIAL_LOOKBACK, monitoring_cadence=utils.DEFAULT_MONITORING_CADENCE, skip_timeseries=False, skip_report=False, persist=False, verbose=False, quiet=False, ): """ launch and manage all streaming processes """ logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end) # set up logging tag = config.tag rootdir = config.rootdir traindir = names.tag2traindir(tag, rootdir=rootdir) evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir) calibratedir = names.tag2calibratedir(tag, rootdir=rootdir) # data product discovery reporter_factory = factories.ReporterFactory() trainreporter = reporter_factory( traindir, gps_start, gps_end, **config.train["reporting"], ) evaluatereporter = reporter_factory( evaluatedir, gps_start, gps_end, **config.evaluate["reporting"], ) calibratereporter = reporter_factory( calibratedir, gps_start, gps_end, **config.calibrate["reporting"], ) train_datasetreporter = reporter_factory( traindir, gps_start, gps_end, flavor="dataset" ) train_cleansegsreporter = reporter_factory( traindir, gps_start, gps_end, flavor="segment" ) evaluate_cleansegsreporter = reporter_factory( evaluatedir, gps_start, gps_end, flavor="segment" ) assert ( len(config.classifiers) == 1 ), "multiple classifiers are not allowed to be run for stream.stream" [nickname] = config.classifiers new_model = False bootstrap = False # generate new model if no preferred model exists already if not (model := trainreporter.retrieve(nickname, preferred=True)): # search for bootstrap model if bootstrap_model_path := config.train.get("bootstrap_model"): bootstrap = True logger.info("bootstrapping analysis with pre-trained model") model = trainreporter.read(bootstrap_model_path) data_id = model.model_id else: logger.info( "preferred model not available, " "launching training job over [%.3f, %.3f]" % (gps_start - initial_lookback, gps_start) ) try: data_id = utils.generate_unique_id() batch.train( gps_start - initial_lookback, gps_start, config_path, data_id=data_id, preferred=True, ) except Exception: logger.error("initial training job failed!") raise else: new_model = True # retrieve this model and add it to the generic # preferred list for stream data discovery model_name = names.id2nickname(nickname, data_id) model = trainreporter.retrieve(model_name, preferred=True) trainreporter.report(nickname, model, preferred=True) # use evaluated dataset as initial dataset for calibration # NOTE: update times to point at batch runs train_datasetreporter.start = gps_start - initial_lookback train_datasetreporter.end = gps_start evaluatereporter.start = gps_start - initial_lookback evaluatereporter.end = gps_start train_cleansegsreporter.start = gps_start - initial_lookback train_cleansegsreporter.end = gps_start evaluate_cleansegsreporter.start = gps_start - initial_lookback evaluate_cleansegsreporter.end = gps_start # copy over evaluated dataset dataset = train_datasetreporter.retrieve( names.id2nickname(f"{nickname}_eval_dataset", data_id), ) evaluatereporter.report(names.id2nickname(nickname, data_id), dataset) # copy over clean segments clean_segs = train_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", data_id) ) evaluate_cleansegsreporter.report( names.id2nickname("cleanSegments", data_id), clean_segs ) else: data_id = model.model_id # search for bootstrap calibration map if needed, otherwise, # generate new calibration map if a new model was trained or # an existing calibration map doesn't exist for the identifier id_nickname = names.id2nickname(nickname, data_id) if bootstrap: bootstrap_map_path = config.calibrate.get("bootstrap_map") if not bootstrap_map_path: raise ValueError( "if a bootstrap model is used, we also " "require a corresponding calibration map" ) logger.info("bootstrapping analysis with pre-trained calibration") calibmap = calibratereporter.read(bootstrap_map_path) # verify that model IDs match up if data_id != calibmap.model_id: raise ValueError( "bootstrap model and calibration map require matching IDs; " f"model ID: {data_id}, calibration map ID: {calibmap.model_id}" ) # add to generic preferred lists for stream data discovery trainreporter.report(nickname, model, preferred=True) calibratereporter.report( names.id2nickname(nickname, data_id), calibmap, preferred=True ) elif new_model or not calibratereporter.preferred(id_nickname): logger.info( "preferred calibration map not available, " "launching calibration job over [%.3f, %.3f]" % (gps_start - initial_lookback, gps_start) ) try: batch.calibrate( gps_start - initial_lookback, gps_start, config_path, data_id=data_id, preferred=True, use_training_set=False, ) except Exception: logger.error("initial calibrate job failed!") raise # for processes based on workflow if workflow == "block": raise ValueError( "stream cannot manage processes via workflow=block because it " "must launch several processes simultaneously" ) elif workflow == "fork": logger.info("launching asynchronous tasks in parallel via multiprocessing") args = (config_path,) # set up common arguments kwargs = {"gps_start": gps_start, "gps_end": gps_end} workers = [] try: jobs = [ (train, "train"), (evaluate, "evaluate"), (calibrate, "calibrate"), (timeseries, "timeseries"), ] if not skip_report: jobs.append((report, "report")) for job, name in jobs: logger.info("forking %s" % name) proc = mp.Process(target=job, name=name, args=args, kwargs=kwargs) proc.start() workers.append(proc) if persist: # stick around and watch forked procs if requested num_proc = len(workers) # hard coding delay here in case DEFAULT_DELAY is changed... manager = utils.CadenceManager( gpstime.gpsnow(), stride=monitoring_cadence, delay=0 ) while num_proc: logger.info("monitoring %d processes" % num_proc) for _ in range(num_proc): proc = workers.pop(0) if proc.is_alive(): # process is still running workers.append(proc) else: # make sure we send SIGKILL, which this should do proc.join() logger.info( "%s terminated with exitcode=%d" % (proc.name, proc.exitcode) ) if proc.exitcode: raise RuntimeError( "%s terminated with exitcode=%d" % (proc.name, proc.exitcode) ) num_proc = len(workers) manager.wait() # wait so we don't spam the OS except Exception as e: while workers: proc = workers.pop() proc.terminate() raise e elif workflow == "condor": logger.info("launching asynchronous tasks in parallel via condor") dag_path = condor.create_stream_dag( config, config_path, names.tag2streamdir(tag, rootdir=rootdir), initial_lookback=initial_lookback, skip_timeseries=skip_timeseries, skip_report=skip_report, verbose=verbose, quiet=quiet, **config.condor, ) logger.info("dag written to {}".format(dag_path)) exitcode = condor.submit_dag(dag_path, block=False) if exitcode: raise RuntimeError("non-zero returncode for dag {}".format(dag_path)) if persist: raise NotImplementedError( "we currently cannot monitor condor for completion of streaming jobs..." ) else: raise ValueError("workflow=%s not understood" % workflow)
[docs]def train( config_path, gps_start=None, gps_end=None, initial_lookback=DEFAULT_INITIAL_LOOKBACK, ): """ run the training process """ # set up gps ranges gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end) # read in config logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() dataloader_factory = factories.DataLoaderFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(config.rootdir) traindir = names.tag2traindir(tag, rootdir) # used to store results for directory in [rootdir, traindir]: os.makedirs(directory, exist_ok=True) target_channel = config.samples["target_channel"] target_bounds = configparser.config2bounds(config.samples["target_bounds"]) dirty_bounds = configparser.config2bounds(config.samples["dirty_bounds"]) dirty_window = config.samples["dirty_window"] random_rate = config.train["random_rate"] # set up how we record results # the boundaries for these will be re-set within the main loop, hence, set # to zero initially trainreporter = reporter_factory( traindir, 0, 0, group=names.tag2group(tag, "train"), **config.train["reporting"], ) # other reporters used to record intermediate data products segmentreporter = reporter_factory(traindir, gps_start, gps_end, flavor="segment") # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run assert ( len(config.classifiers) == 1 ), "multiple classifiers are not allowed to be run for stream.train" [nickname] = config.classifiers items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) classifier = classifier_factory( nickname, rootdir=traindir, generate_id=True, **items ) is_incremental = isinstance(classifier, classifiers.IncrementalSupervisedClassifier) # figure out how we'll read data items = config.features logger.debug( "dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) items = configparser.add_missing_kwargs(items, group=names.tag2group(tag, "train")) time = items.pop("time") # extract the name of the column we use as time safe_columns = items.get("safe_columns", items["columns"]) sitems = config.train["stream"] logger.debug( "stream_processor -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items()) ) # initialize stream object stream = StreamProcessor( gps_start, gps_end, dataloader_factory, buffer_kwargs=items, **sitems, ) # configure segdb if required if config.train.get("ignore_segdb", False): logger.info("ignoring segdb") else: stream.configure_segdb( config.segments, reporter=segmentreporter, ) # set up initial data query segs = stream.query_segdb(gps_start - initial_lookback, gps_start) dataloader = dataloader_factory( gps_start - initial_lookback, gps_start, segs=segs, **items ) logger.info( "initial data query: [%.3f, %.3f)" % (gps_start - initial_lookback, gps_start) ) data = dataloader.query() # get target times and draw from random times new_target_times = dataloader.target_times( time, target_channel, target_bounds, segs=segs ) logger.info("identified %d target_times" % len(new_target_times)) new_random_times, new_clean_segs = dataloader.random_times( time, target_channel, dirty_bounds, dirty_window, random_rate, segs=segs ) logger.info("identified %d random times" % len(new_random_times)) accumulated_target_times = len(new_target_times) accumulated_random_times = len(new_random_times) accumulated_clean_segs = new_clean_segs # possibly trim down columns after identifying times # to reduce memory footprint data.filter(columns=safe_columns) # make a new dataset and add to current dataset dataset = factories.DatasetFactory(data).labeled(new_target_times, new_random_times) # we'll set the start, end times within the main loop datasetreporter = reporter_factory(traindir, 0, 0, flavor="dataset") if is_incremental: # load in the preferred models so incremental stuff picks up where it left off classifier.model = trainreporter.retrieve(classifier.nickname, preferred=True) # actually run jobs min_new_samples = config.train["stream"].get( "min_new_samples", DEFAULT_MIN_NEW_SAMPLES ) max_samples = config.train["stream"].get("max_samples", DEFAULT_MAX_SAMPLES) max_stride = config.train["stream"].get("max_stride", DEFAULT_MAX_STRIDE) # start up streaming pipeline logger.info("starting streaming training") while stream.timestamp < gps_end: logger.info("--- train stride: [%.3f, %.3f) ---" % stream.seg) # poll for new data new_data, segs = stream.poll() timestamp = segs[0][0] if len(segs) else stream.timestamp lvtm = utils.livetime(segs & new_data.segs) if new_data else 0 logger.info("acquired %.3f sec of data at %d" % (lvtm, timestamp)) # if there is new samples, create a new umbrella and add to big umbrella if lvtm: # get target times and draw from random times try: new_target_times = new_data.target_times( time, target_channel, target_bounds, segs=segs ) except KeyError: logger.warning("target channel missing, moving to next stride") continue else: accumulated_target_times += len(new_target_times) logger.info( "identified %d new target times for a total of %d target times" % (len(new_target_times), accumulated_target_times) ) new_random_times, new_clean_segs = new_data.random_times( time, target_channel, dirty_bounds, dirty_window, random_rate, segs=segs ) accumulated_random_times += len(new_random_times) accumulated_clean_segs |= new_clean_segs logger.info( "identified %d new random times for a total of %d random times" % (len(new_random_times), accumulated_random_times) ) # possibly trim down columns after identifying times # to reduce memory footprint new_data.filter(columns=safe_columns) # make a new dataset and add to big dataset new_dataset = factories.DatasetFactory(new_data).labeled( new_target_times, new_random_times ) dataset += new_dataset # after generating new times and datasets, filter by segments before # combining data this prevents repeated samples from being added in # if/when we add padding new_data.filter(segs, time=time) data += new_data # launch a new training job if there is enough data if accumulated_target_times + accumulated_random_times < min_new_samples: logger.info( "%d accumulated times < %d, waiting to launch new training jobs" % ( accumulated_target_times + accumulated_random_times, min_new_samples, ) ) else: # launch a new training job if there is enough data logger.info( "%d accumulated times >= %d, launching new training jobs" % ( accumulated_target_times + accumulated_random_times, min_new_samples, ) ) # create a new dataset from the training data, times, and labels dataset = features.Dataset.from_datachunks( data, times=dataset.times, labels=dataset.labels ) # filter dataset by livetime and number of samples # also trim underlying data source by livetime if data.livetime > max_stride: logger.info( "filtering dataset to reduce the total livetime to %.3f sec" % max_stride ) if len(dataset.times) > max_samples: logger.info( "filtering dataset to reduce number of target times to <= %d" % max_samples ) data.flush(max_stride=max_stride, time=time) dataset.flush(max_stride=max_stride, max_samples=max_samples) logger.info( "launching training job for [%.3f, %.3f) (new [%.3f, %.3f))" % (dataset.start, dataset.end, segs[0][0], segs[-1][1]) ) # set timestamp for reporter trainreporter.start = dataset.start trainreporter.end = dataset.end datasetreporter.start = dataset.start datasetreporter.end = dataset.end if is_incremental: path = datasetreporter.report("newdataset", new_dataset) logger.debug("new_dataset written to " + path) else: path = datasetreporter.report("dataset", dataset) logger.debug("dataset written to " + path) logger.info("training classifier") if is_incremental: ntarget = accumulated_target_times nrandom = accumulated_random_times logger.info( "training %s with new_dataset " "(%d target_times, %d random_times)" % (nickname, ntarget, nrandom) ) model = classifier.train(new_dataset) else: ntarget, nrandom = dataset.vectors2num() logger.info( "training %s with dataset " "(%d target_times, %d random_times)" % (nickname, ntarget, nrandom) ) model = classifier.train(dataset) # record clean segments segmentreporter.start = dataset.start segmentreporter.end = dataset.end path = segmentreporter.report( names.id2nickname("cleanSegments", model.model_id), accumulated_clean_segs, ) logger.debug("clean segments written to " + path) # record dataset with ranks logger.info("generating evaluated dataset") if is_incremental: eval_dataset = classifier.evaluate(new_dataset) else: eval_dataset = classifier.evaluate(dataset) path = datasetreporter.report( names.id2nickname(f"{nickname}_eval_dataset", model.model_id), eval_dataset, ) logger.debug( "evaluated dataset for %s written to %s" % (nickname, path) ) # record new model. we do this last so that downstream jobs # don't run into issues accessing data products that haven't # been generated yet trainreporter.start, trainreporter.end = ( model.start, model.end, ) logger.info("reporting model") path = trainreporter.report(nickname, model, preferred=True) logger.debug("model for %s written to %s" % (nickname, path)) # filter underlying data by bounds after training # to reduce memory footprint data.filter(bounds=model.selector.bounds, time=time) # re-set target and random time counters accumulated_target_times = 0 accumulated_random_times = 0 # recreate the dataset without any transformations applied dataset = features.Dataset.from_datachunks( data, times=dataset.times, labels=dataset.labels ) else: logger.info("no identified times, skipping training") # repeat at processing cadence stream.flush(retain=0) # FIXME: worry about padding
[docs]def evaluate(config_path, gps_start=None, gps_end=None): """ the realtime evaluation routine """ # set up gps ranges gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end) # read in config logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() dataloader_factory = factories.DataLoaderFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(config.rootdir) evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir) traindir = names.tag2traindir(tag, rootdir=rootdir) for directory in [rootdir, traindir, evaluatedir]: os.makedirs(directory, exist_ok=True) # get info for building datasets target_channel = config.samples["target_channel"] target_bounds = configparser.config2bounds(config.samples["target_bounds"]) dirty_bounds = configparser.config2bounds(config.samples["dirty_bounds"]) dirty_window = config.samples["dirty_window"] random_rate = config.evaluate["random_rate"] # set up how we record results # the boundaries for these will be re-set within the main loop, hence, set # to zero initially evaluatereporter = reporter_factory( evaluatedir, 0, 0, group=names.tag2group(tag, "evaluate"), **config.evaluate["reporting"], ) trainreporter = reporter_factory( traindir, 0, 0, group=names.tag2group(tag, "evaluate"), **config.train["reporting"], ) # other reporters used to record intermediate data products segmentreporter = reporter_factory( evaluatedir, gps_start, gps_end, flavor="segment" ) # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run assert ( len(config.classifiers) == 1 ), "multiple classifiers are not allowed to be run for stream.evaluate" [nickname] = config.classifiers items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) classifier = classifier_factory( nickname, rootdir=evaluatedir, genetate_id=True, **items ) # figure out how we'll read data items = config.features logger.debug( "dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) items = configparser.add_missing_kwargs( items, group=names.tag2group(tag, "evaluate") ) time = items.pop("time") # extract the name of the column we use as time sitems = config.evaluate["stream"] logger.debug( "stream_processor -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items()) ) # initialize stream object stream = StreamProcessor( gps_start, gps_end, dataloader_factory, buffer_kwargs=items, **sitems, ) # configure segdb if required if config.evaluate.get("ignore_segdb", False): logger.info("ignoring segdb") else: stream.configure_segdb( config.segments, reporter=segmentreporter, ) # actually run jobs max_samples = config.evaluate.get("max_samples", DEFAULT_MAX_SAMPLES) model_id = None # start up streaming pipeline logger.info("starting streaming evaluation") while stream.timestamp < gps_end: logger.info("--- evaluate stride: [%.3f, %.3f) ---" % stream.seg) # retrieve trained models # get the next model available model = trainreporter.retrieve(nickname, preferred=True) # assumes we poll more frequently than models are produced # update if there is a new model if model is not None: logger.info("updating model for " + nickname) classifier.model = model model_id = model.model_id # poll for new data new_data, segs = stream.poll() timestamp = segs[0][0] if len(segs) else stream.timestamp lvtm = utils.livetime(segs & new_data.segs) if new_data else 0 logger.info("acquired %.3f sec of data at %d" % (lvtm, timestamp)) # if there is new samples, create a new umbrella if lvtm: # NOTE: we only condition on whether there is something new to do # and don't care about min_new_samples, max_samples, max_stride we # just always evaluate everything evaluate_start = segs[0][0] evaluate_end = segs[-1][1] # get target times and draw from random times try: target_times = new_data.target_times( time, target_channel, target_bounds, segs=segs ) except KeyError: logger.warning("target channel missing, moving to next stride") continue else: logger.info("identified %d target times" % len(target_times)) if len(target_times) > max_samples: logger.info("retaining %d target times" % max_samples) target_times = target_times[-max_samples:] random_times, clean_segs = new_data.random_times( time, target_channel, dirty_bounds, dirty_window, random_rate, segs=segs ) logger.info("identified %d random times" % len(random_times)) if len(random_times) > max_samples: logger.info("retaining %d random times" % max_samples) random_times = random_times[-max_samples:] # evaluate classifiers if there are times in which to do evaluation if len(target_times) > 0 or len(random_times) > 0: # build a dataset dataset = factories.DatasetFactory(new_data).labeled( target_times, random_times ) # update reporter timestamps evaluatereporter.start = evaluate_start evaluatereporter.end = evaluate_end logger.info( "launching evaluation job for [%.3f, %.3f)" % (evaluate_start, evaluate_end) ) # record clean segments segmentreporter.start = evaluate_start segmentreporter.end = evaluate_end path = segmentreporter.report( names.id2nickname("cleanSegments", model_id), clean_segs, preferred=True, ) logger.debug("clean segments written to " + path) logger.info("evaluating dataset for %s" % nickname) dataset = classifier.evaluate(dataset) # manage provenance evaluatereporter.start, evaluatereporter.end = ( dataset.start, dataset.end, ) logger.info("reporting dataset") dataset_name = names.id2nickname(nickname, model_id) path = evaluatereporter.report(dataset_name, dataset, preferred=True) logger.debug("dataset written to %s" % path) else: logger.info("no identified times, skipping evaluation") # repeat at processing cadence stream.flush(retain=0) # FIXME: worry about padding
[docs]def calibrate(config_path, gps_start=None, gps_end=None): """ calibrate the classifier predictions based on historical data """ # set up gps ranges gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end) # read in config logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(config.rootdir) traindir = names.tag2traindir(tag, rootdir=rootdir) evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir) calibratedir = names.tag2calibratedir(tag, rootdir=rootdir) for directory in [rootdir, traindir, evaluatedir, calibratedir]: os.makedirs(directory, exist_ok=True) # set up how we record results # the boundaries for these will be re-set within the main loop, hence, set # to zero initially trainreporter = reporter_factory( traindir, 0, 0, group=names.tag2group(tag, "calibrate"), **config.train["reporting"], ) evaluatereporter = reporter_factory( evaluatedir, 0, 0, group=names.tag2group(tag, "calibrate"), **config.evaluate["reporting"], ) calibratereporter = reporter_factory( calibratedir, 0, 0, group=names.tag2group(tag, "calibrate"), **config.calibrate["reporting"], ) # other reporters used to record intermediate data products cleansegsreporter = reporter_factory( evaluatedir, gps_start, gps_end, flavor="segment" ) train_cleansegsreporter = reporter_factory( traindir, gps_start, gps_end, flavor="segment" ) train_datasetreporter = reporter_factory(traindir, 0, 0, flavor="dataset") # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run assert ( len(config.classifiers) == 1 ), "multiple classifiers are not allowed to be run for stream.calibrate" [nickname] = config.classifiers items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) classifier = classifier_factory(nickname, rootdir=calibratedir, **items) # extract basic parameters for calibration calibrate_kwargs = config.calibrate min_new_samples = calibrate_kwargs.pop("min_new_samples", DEFAULT_MIN_NEW_SAMPLES) # set up cadence manager sitems = config.calibrate["stream"] logger.debug( "cadence_manager -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items()) ) assert "max_timestamp" not in sitems, ( "max_timestamp is set by hand to gps_end and " "cannot be passed through INI file via evaluate stream" ) manager = utils.CadenceManager(gps_start, max_timestamp=gps_end, **sitems) # get the next model available model = trainreporter.retrieve(nickname, preferred=True) model_id = model.model_id data_id = names.id2nickname(nickname, model.model_id) # load in existing calibration map if available calibmap = calibratereporter.retrieve(data_id, preferred=True) if calibmap: classifier.calibration_map = calibmap else: # initial calibration with new model train_cleansegsreporter.start = model.start train_cleansegsreporter.end = model.end initial_clean_segs = train_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", model.model_id) ) train_datasetreporter.start = model.start train_datasetreporter.end = model.end train_dataset = train_datasetreporter.retrieve( names.id2nickname(f"{nickname}_eval_dataset", model.model_id), ) # recalibrate with initial training dataset logger.info("initial calibration for {} with new model".format(nickname)) classifier.model = model calibmap = classifier.calibrate( train_dataset, clean_segs=initial_clean_segs, **calibrate_kwargs ) logger.info("reporting calibration") calibratereporter.start = model.start calibratereporter.end = model.end path = calibratereporter.report(data_id, calibmap, preferred=True) logger.debug("calibration written to %s" % path) # set up accumulated datasets as needed # NOTE: we only need to do this here because everything else uses # StreamProcessor to avoid reading in redundant samples, etc dataset = features.Dataset(start=manager.timestamp, end=manager.timestamp) # set up accumulated clean segments for calibration clean_segs = segmentlist([]) # start up streaming pipeline logger.info("starting streaming calibration") while manager.timestamp < gps_end: logger.info("--- calibrate stride: [%.3f, %.3f) ---" % manager.seg) timestamp, _ = manager.wait() # manage cadence and sleep logic here # get the next model available # assumes we poll more frequently than models are produced # update if there is a new model model = trainreporter.retrieve(nickname, preferred=True) # if new model available, flush calibration map and # generate new calibration with datasets if model is not None: logger.info("updating model for " + nickname) classifier.model = model # data identifier model_id = model.model_id data_id = names.id2nickname(nickname, model.model_id) # resetting calibration map logger.info("resetting calibration map") classifier.calibration_map = None # load auxiliary data to trained model train_cleansegsreporter.start = model.start train_cleansegsreporter.end = model.end initial_clean_segs = train_cleansegsreporter.retrieve( names.id2nickname("cleanSegments", model.model_id) ) train_datasetreporter.start = model.start train_datasetreporter.end = model.end train_dataset = train_datasetreporter.retrieve( names.id2nickname(f"{nickname}_eval_dataset", model.model_id), ) # recalibrate with initial training dataset logger.info("recalibrating calibration map with new model") calibmap = classifier.calibrate( train_dataset, clean_segs=initial_clean_segs, **calibrate_kwargs ) logger.info("reporting calibration") calibratereporter.start = model.start calibratereporter.end = model.end path = calibratereporter.report(data_id, calibmap, preferred=True) logger.debug("calibration written to %s" % path) # reset dataset # take as my starting point the end of the initial dataset start = dataset.end dataset = features.Dataset(start=start, end=start) logger.info("retrieving clean segments") new_clean_segs = cleansegsreporter.retrieve( names.id2nickname("cleanSegments", model_id), preferred=True ) if new_clean_segs is None: # nothing new available logger.info("no new clean segments for %s at %d" % (nickname, timestamp)) continue # get the next dataset available new_dataset = evaluatereporter.retrieve(data_id, preferred=True) if new_dataset is None: # nothing new available logger.info("no new dataset for %s at %d" % (nickname, timestamp)) continue elif (len(new_dataset) == 0) or (new_dataset.end <= dataset.end): logger.info("empty dataset for %s at %d" % (nickname, timestamp)) elif new_dataset.start >= dataset.end: # represents new data! ngch, ncln = new_dataset.vectors2num() logger.info( "acquired %.3f sec of data for %s at %d (%d glitch, %d clean)" % ( utils.livetime(new_dataset.segs), nickname, timestamp, ngch, ncln, ) ) dataset += new_dataset clean_segs |= new_clean_segs elif new_dataset.end > dataset.end: # at least some of the new_dataset is actually new trunc = new_dataset.segs & segmentlist([(dataset.end, new_dataset.end)]) ngch, ncln = new_dataset.vectors2num() logger.info( "acquired %.3f sec of data for %s at %d (%d glitch, %d clean)" % (utils.livetime(trunc), nickname, timestamp, ngch, ncln) ) new_dataset.filter(segs=trunc) dataset += new_dataset clean_segs |= new_clean_segs logger.info("calibrating classifiers") if len(dataset) >= min_new_samples: # need to spawn a calibration job for this classifier calibratereporter.start = dataset.start calibratereporter.end = dataset.end # limit clean segments to those used by calibration calib_clean_segs = clean_segs & dataset.segs # evaluate dataset and save probabilities to disk ngch, ncln = dataset.vectors2num() logger.info("calibrating %s (%d glitch, %d clean)" % (nickname, ngch, ncln)) # this works incrementally by default calibmap = classifier.calibrate( dataset, clean_segs=calib_clean_segs, **calibrate_kwargs ) # manage provenance calibratereporter.start, calibratereporter.end = ( calibmap.start, calibmap.end, ) logger.info("reporting calibration") path = calibratereporter.report(data_id, calibmap, preferred=True) logger.info("calibration written to %s" % path) # re-set dataset # take as my starting point where the old dataset left off start = dataset.end dataset = features.Dataset(start=start, end=start) else: logger.info( "not enough new data to re-calibrate %s " "(%d samples; required at least %d)" % (nickname, len(dataset), min_new_samples) )
[docs]def timeseries(config_path, gps_start=None, gps_end=None): """ the realtime evaluation routine """ # set up gps ranges gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end) # read in config logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # define factories to generate objects reporter_factory = factories.ReporterFactory() classifier_factory = factories.ClassifierFactory() dataloader_factory = factories.DataLoaderFactory() # extract common parameters tag = config.tag rootdir = os.path.abspath(config.rootdir) timeseriesdir = names.tag2timeseriesdir(tag, rootdir=rootdir) calibratedir = names.tag2calibratedir(tag, rootdir=rootdir) traindir = names.tag2traindir(tag, rootdir=rootdir) for directory in [rootdir, traindir, calibratedir, timeseriesdir]: os.makedirs(directory, exist_ok=True) # extract information necessary for channel names in timeseries files instrument = config.instrument description = config.timeseries.get("description") target_bounds = configparser.config2bounds(config.samples["target_bounds"]) frequency = config.features["frequency"] assert frequency in target_bounds, ( "must specify a frequency range (called %s) within target_bounds" % frequency ) freq_min, freq_max = target_bounds[frequency] assert isinstance(freq_min, int) and isinstance( freq_max, int ), "frequency bounds must be integers!" set_ok = config.timeseries.get("set_ok") # set up aggregator for metric collection if config.monitor: logger.info("setting up metric collection") kafka_client = kafka.Client( "kafka://{}".format(config.features["timeseries"]["url"]) ) # register schema for desired metrics # FIXME should we be getting these from scald config? kafka_client.subscribe( [ f"idq.{config.tag}.{metric}" for metric in ["fap", "loglike", "rank", "latency", "ok"] ] ) # set up how we record results # the boundaries for these will be re-set within the main loop, hence, set # to zero initially trainreporter = reporter_factory( traindir, 0, 0, group=names.tag2group(tag, "timeseries"), **config.train["reporting"], ) calibratereporter = reporter_factory( calibratedir, 0, 0, group=names.tag2group(tag, "timeseries"), **config.calibrate["reporting"], ) timeseriesreporter = reporter_factory( timeseriesdir, 0, 0, group=names.tag2group(tag, "timeseries"), **config.timeseries["reporting"], ) # other reporters used to record intermediate data products segmentreporter = reporter_factory( timeseriesdir, gps_start, gps_end, flavor="segment" ) # set a random seed if provided seed = config.samples.get("random_seed", None) if seed is not None: logger.info("setting random seed: %d" % seed) np.random.seed(seed) # figure out which classifiers we'll run assert ( len(config.classifiers) == 1 ), "multiple classifiers are not allowed to be run for stream.timeseries" [nickname] = config.classifiers items = config.classifier_map[nickname] logger.debug( nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) classifier = classifier_factory(nickname, rootdir=timeseriesdir, **items) # figure out how we'll read data if "timeseries" in config.features: items = config.features["timeseries"] else: items = config.features logger.debug( "dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in items.items()) ) items = configparser.add_missing_kwargs( items, group=names.tag2group(tag, "timeseries") ) # figure out timeseres sampling rate -> dt srate = config.timeseries["srate"] logger.info("generating timeseries sampled at %.3f Hz" % srate) dt = 1.0 / srate # initialize stream processor sitems = config.timeseries["stream"] logger.debug( "stream_processor -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items()) ) stream = StreamProcessor( gps_start, gps_end, dataloader_factory, buffer_kwargs=items, **sitems, ) # configure segdb if required if config.timeseries.get("ignore_segdb", False): logger.info("ignoring segdb") else: stream.configure_segdb( config.segments, reporter=segmentreporter, ) # cache to sync up data products with the same ID new_model = None model_id = None new_id = False # start up streaming pipeline logger.info("starting streaming timeseries") while stream.timestamp < gps_end: logger.info("--- timeseries stride: [%.3f, %.3f) ---" % stream.seg) # update internals for the classifier # retrieve trained model # get the next model available model = trainreporter.retrieve(nickname, preferred=True) # assumes we poll faster than models are produced # update if there is a new model if model is not None: new_id = True model_id = model.model_id new_model = model # retrieve calibration map # get the next map available data_id = names.id2nickname(nickname, model_id) calibration_map = calibratereporter.retrieve(data_id, preferred=True) # assumes we poll faster than maps are produced # update if there is a new calibration map if new_id and calibration_map: logger.info("updating model and calibration map for " + nickname) classifier.model = new_model classifier.calibration_map = calibration_map new_id = False elif calibration_map: logger.info("updating calibration map for " + nickname) classifier.calibration_map = calibration_map elif new_id and not classifier.is_trained: # new model available without calibration map, but # no previous model exists. need to wait for a new map logger.warning( "initial model found without calibration map, " "updating model and skipping stride" ) classifier.model = new_model continue elif new_id: # no new calibration map yet for new model, see if there # is a new calibration map for the existing model and # update that instead if needed data_id = names.id2nickname(nickname, classifier.model.model_id) calibration_map = calibratereporter.retrieve(data_id, preferred=True) if calibration_map: logger.info("updating calibration map for " + nickname) classifier.calibration_map = calibration_map if calibration_map: # report on calibration map health when a new one is available if calibration_map.is_healthy(): logger.info( "calibration map for id: %s is healthy, time: %.3f", classifier.calibration_map.model_id, stream.seg[0], ) else: logger.warning( "calibration map for id: %s is not healthy, time: %.3f", classifier.calibration_map.model_id, stream.seg[0], ) # poll for new data # NOTE: we very intentionally to not "fix_segments" # keeping them partitioned this way tells us which frames to write # so that each frame has the same (predictable) duration: stream.stride new_data, segs = stream.poll() timestamp = segs[0][0] if len(segs) else stream.timestamp lvtm = utils.livetime(segs & new_data.segs) if new_data else 0 logger.info("acquired %.3f sec of data at %d" % (lvtm, timestamp)) # handle case where timeseries job just started but doesn't # have all necessary data products available if classifier.is_trained and not classifier.is_calibrated: logger.warning( "no calibration map found for initial model yet, skipping stride" ) continue # create timeseries if there is new data if lvtm: dataset_factory = factories.DatasetFactory(new_data) for seg in segs: # process a single segment at a time to guarantee uniform duration seglist = segmentlist([seg]) # update report times timeseriesreporter.start, timeseriesreporter.end = seg # generate series logger.info("generating timeseries") info = ChannelInfo( instrument, freq_min, freq_max, nickname, description=description ) series = classifier.timeseries( info, dataset_factory, dt=dt, segs=seglist, set_ok=set_ok, ) # report timeseries logger.debug("reporting timeseries") for path in batch.report_timeseries(timeseriesreporter, info, series): logger.info("timeseries written to %s" % path) # record latency time = float(seg[0]) latency = utils.gps2latency(time) logger.info("latency upon writing timeseries: %.3f" % latency) # report metrics if config.monitor: # format metrics for s in series: # find location of highest logL point w/in series max_ind = np.argmax(s.loglike.series) columns = { "fap": s.fap.series[max_ind], "loglike": s.loglike.series[max_ind], "rank": s.rank.series[max_ind], "latency": latency, "ok": np.min(s.ok.series), } # store and aggregate metrics logger.debug("reporting metrics") for metric, column in columns.items(): kafka_client.write( f"idq.{config.tag}.{metric}", {"time": [time], "data": [column]}, ) else: logger.info("no identified times, skipping timeseries") # repeat at processing cadence stream.flush(retain=0) # FIXME: worry about padding
[docs]def report(config_path, gps_start=None, gps_end=None, reportdir=None): """run batch.report jobs periodically""" # set up gps ranges gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end) # read in config logger.info("using config : %s" % config_path) config_path = os.path.abspath(os.path.realpath(config_path)) config = configparser.path2config(config_path) # set up cadence manager # support a running average if specified lookback = config.report.get("lookback", 0) sitems = config.report["stream"] logger.debug( "cadence_manager -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items()) ) assert ( "max_timestamp" not in sitems ), "max_timestamp is set by hand to gps_end and cannot be passed through config" manager = utils.CadenceManager(gps_start, max_timestamp=gps_end, **sitems) # actually loop logger.info("starting streaming reporting") while manager.timestamp < gps_end: logger.info("--- report stride: [%.3f, %.3f) ---" % manager.seg) start, end = manager.wait() # manage cadence and sleep logic here # we allow users to generate a running-average with non-trivial # overlap via "lookback" batch.report(start - lookback, end, config_path, reportdir=reportdir)
[docs]class StreamProcessor(object): """ handles data processing logic for the streaming pipeline """ # define a few string templates for things that will be printed repeated _poll_template = "querying for triggers within [%.4f, %.4f)" _retained_template = "retained %.4f sec of livetime" _ignore_template = "ignoring segdb" _yesdata_template = "found data at %.4f" _nodata_template = "no data found within [%.4f, %.4f)" _incontiguous_template = "gap in data found [%.4f, %.4f). Moving to %.4f" _latency_template = ( "too far behind realtime at (%.4f sec at %.4f), skipping ahead to %.4f" ) _newbuffer_template = "creating new buffer for [%.4f, %.4f)" def __init__(self, start, end, buffer_factory, buffer_kwargs={}, **manager_kwargs): # this is kinda janky, but it will work... self._init_segdb() self._buffer_factory = buffer_factory # set up cadence manager assert "max_timestamp" not in manager_kwargs, ( "max_timestamp is set to end by hand, " "so it cannot be passed via manager_kwargs!" ) self._manager = utils.CadenceManager( start, max_timestamp=end, logger=logger, **manager_kwargs ) # do this next little backflip to make sure all options are passed # correctly self._buffer_kwargs = buffer_kwargs self._buffer_kwargs.update( self._buffer_factory(start, end, **buffer_kwargs).kwargs ) self._buffer = None # set up the buffer store self._queue = deque() def _init_segdb(self): """ set up all the attributes needed for segdb stuff these will all be overwritten upon a call to configure_segdb """ self._ignore_segdb = True # default behavior self._segdb_url = None self._segdb_intersect = None self._segdb_exclude = None self._segdb_reporter = None @property def ignore_segdb(self): return self._ignore_segdb @property def stride(self): return self._manager.stride @property def delay(self): return self._manager.delay @property def timestamp(self): return self._manager.timestamp @property def seg(self): return self._manager.seg @property def segs(self): return self._manager.segs @property def segdb_url(self): return self._segdb_url @property def segdb_intersect(self): return self._segdb_intersect @property def segdb_exclude(self): return self._segdb_exclude @property def segdb_reporter(self): return self._segdb_reporter
[docs] def configure_segdb(self, seg_config, reporter=None): """ we have to configure_segdb in order to not ignore_segdb, and we assume that one would only configure_segdb if you wanted to use it thus, we manage the boolean flag controlling whether to use segdb based on whether this method was called """ self._ignore_segdb = False self._segdb_url = seg_config["segdb_url"] self._segdb_union = seg_config.get("union", []) self._segdb_intersect = seg_config.get("intersect", []) self._segdb_exclude = seg_config.get("exclude", []) self._segdb_reporter = reporter if "replay" in seg_config: self._segdb_offset = seg_config["replay"]["offset"] logger.info( "replay mode enabled, shifting all dqsegdb " f"query times by {self._segdb_offset}s" ) else: self._segdb_offset = 0 self._segdb_template = ( "querying %s within %s for: union=%s ; intersect=%s ; exclude=%s" % ( self._segdb_url, "[%.3f, %.3f)", ",".join(self._segdb_union), ",".join(self._segdb_intersect), ",".join(self._segdb_exclude), ) ) # update the existing buffer if self._buffer is not None: self._buffer.filter(self.query_segdb(self._buffer.start, self._buffer._end))
[docs] def query_segdb(self, start, end): """ a helper function that manages segment construction for each buffer """ if self.ignore_segdb: logger.debug(self._ignore_template) segs = segmentlist([segment(start, end)]) else: query_start = start - self._segdb_offset query_end = end - self._segdb_offset logger.info(self._segdb_template % (query_start, query_end)) segs = utils.segdb2segs( query_start, query_end, union=self._segdb_union, intersect=self._segdb_intersect, exclude=self._segdb_exclude, segdb_url=self._segdb_url, ) logger.info(self._retained_template % utils.livetime(segs)) if self._segdb_offset: segs.shift(self._segdb_offset) if self._segdb_reporter is not None: self._segdb_reporter.start = start self._segdb_reporter.end = end path = self._segdb_reporter.report("segments", segs) logger.info("segments written to " + path) return segs
def new_buffer(self, start): end = start + self.stride logger.debug(self._newbuffer_template % (start, end)) if self._buffer: self._buffer_kwargs.update(self._buffer.kwargs) return self._buffer_factory( start, end, segs=self.query_segdb(start, end), **self._buffer_kwargs ) def __len__(self): return len(self._queue) def flush(self, retain=0): # clear out all but the most recently added buffer, which is done # so new samples can access older data to create feature vectors while len(self) > retain: self._queue.popleft()
[docs] def wait(self): """ delegates to CadenceManager.wait(), which updates timestamp after sleeping """ return self._manager.wait()
[docs] def poll(self, **triggers_kwargs): """ the workhorse method manages requests for data and I/O via ClassifierData objects constructs and returns an umbrella of the requested ClassifierDatas for this stride triggers_kwargs is passed to calls to ClassifierData.triggers and is exposed to help control I/O costs """ segs = segmentlist([]) try: for seg in self._manager.poll(): # manager.poll will gobble up multiple segments if needed # we wait to create this until after self.wait() because queries # to segdb self._buffer = self.new_buffer(seg[0]) # within new_buffer() must be causal if utils.livetime(self._buffer.segs): # there is non-trivial livetime in the new buffer logger.debug( self._poll_template % (self._buffer.start, self._buffer.end) ) try: # try to retrieve data data = self._buffer.query(**triggers_kwargs) except exceptions.IncontiguousDataError as e: # span of data doesn't match span of classifier data logger.warning( self._incontiguous_template % (e.gap_start, e.gap_end, e.timestamp) ) self._manager.timestamp = e.timestamp except exceptions.NoDataError as e: # no data found when querying logger.warning( self._nodata_template % (e.timestamp, e.timestamp + e.stride) ) else: logger.debug(self._yesdata_template % seg[0]) self._queue.append(data) segs.append(seg) except exceptions.LatencyError as e: logger.warning( self._latency_template % (e.latency, e.current_timestamp, e.timestamp) ) self._manager.timestamp = e.timestamp # return children of umbrella corresponding to new times # FIXME: worry about padding and edge effects for feature vectors? return ( features.combine_chunks(list(self._queue)), segs, )