from collections import deque
import logging
import multiprocessing as mp
import os
import numpy as np
import gpstime
from ligo.scald.io import kafka
from ligo.segments import segment, segmentlist
from . import batch
from . import calibration
from . import classifiers
from . import condor
from . import configparser
from . import factories
from . import features
from . import names
from . import utils
from . import exceptions
from .series import ChannelInfo
DEFAULT_STREAM_WORKFLOW = "fork"
DEFAULT_INITIAL_LOOKBACK = 3600 # one hour
# set to be +infty
DEFAULT_GPS_END = 2000000000
DEFAULT_MIN_NEW_SAMPLES = 1
DEFAULT_MAX_SAMPLES = calibration.DEFAULT_MAX_SAMPLES
DEFAULT_MAX_STRIDE = calibration.DEFAULT_MAX_STRIDE
DEFAULT_PLOT_STRIDE = 10
DEFAULT_MAX_METRIC_SPAN = 60 # one minute
DEFAULT_MAX_LATENCY_SPAN = 4 * 3600 # four hours
logger = logging.getLogger("idq")
[docs]def gps_range(gps_start=None, gps_end=None):
"""
figure out the GPS range for a streaming analysis
"""
if gps_start is None:
gps_start = int(gpstime.gpsnow())
if gps_end is None:
gps_end = DEFAULT_GPS_END
assert gps_end > gps_start, "gps_end (%d) must be > gps_start (%d)!" % (
gps_end,
gps_start,
)
return gps_start, gps_end
[docs]def stream(
config_path,
gps_start=None,
gps_end=None,
workflow=DEFAULT_STREAM_WORKFLOW,
initial_lookback=DEFAULT_INITIAL_LOOKBACK,
monitoring_cadence=utils.DEFAULT_MONITORING_CADENCE,
skip_timeseries=False,
skip_report=False,
persist=False,
verbose=False,
quiet=False,
):
"""
launch and manage all streaming processes
"""
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end)
# set up logging
tag = config.tag
rootdir = config.rootdir
traindir = names.tag2traindir(tag, rootdir=rootdir)
evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir)
calibratedir = names.tag2calibratedir(tag, rootdir=rootdir)
# data product discovery
reporter_factory = factories.ReporterFactory()
trainreporter = reporter_factory(
traindir,
gps_start,
gps_end,
**config.train["reporting"],
)
evaluatereporter = reporter_factory(
evaluatedir,
gps_start,
gps_end,
**config.evaluate["reporting"],
)
calibratereporter = reporter_factory(
calibratedir,
gps_start,
gps_end,
**config.calibrate["reporting"],
)
train_datasetreporter = reporter_factory(
traindir, gps_start, gps_end, flavor="dataset"
)
train_cleansegsreporter = reporter_factory(
traindir, gps_start, gps_end, flavor="segment"
)
evaluate_cleansegsreporter = reporter_factory(
evaluatedir, gps_start, gps_end, flavor="segment"
)
assert (
len(config.classifiers) == 1
), "multiple classifiers are not allowed to be run for stream.stream"
[nickname] = config.classifiers
new_model = False
bootstrap = False
# generate new model if no preferred model exists already
if not (model := trainreporter.retrieve(nickname, preferred=True)):
# search for bootstrap model
if bootstrap_model_path := config.train.get("bootstrap_model"):
bootstrap = True
logger.info("bootstrapping analysis with pre-trained model")
model = trainreporter.read(bootstrap_model_path)
data_id = model.model_id
else:
logger.info(
"preferred model not available, "
"launching training job over [%.3f, %.3f]"
% (gps_start - initial_lookback, gps_start)
)
try:
data_id = utils.generate_unique_id()
batch.train(
gps_start - initial_lookback,
gps_start,
config_path,
data_id=data_id,
preferred=True,
)
except Exception:
logger.error("initial training job failed!")
raise
else:
new_model = True
# retrieve this model and add it to the generic
# preferred list for stream data discovery
model_name = names.id2nickname(nickname, data_id)
model = trainreporter.retrieve(model_name, preferred=True)
trainreporter.report(nickname, model, preferred=True)
# use evaluated dataset as initial dataset for calibration
# NOTE: update times to point at batch runs
train_datasetreporter.start = gps_start - initial_lookback
train_datasetreporter.end = gps_start
evaluatereporter.start = gps_start - initial_lookback
evaluatereporter.end = gps_start
train_cleansegsreporter.start = gps_start - initial_lookback
train_cleansegsreporter.end = gps_start
evaluate_cleansegsreporter.start = gps_start - initial_lookback
evaluate_cleansegsreporter.end = gps_start
# copy over evaluated dataset
dataset = train_datasetreporter.retrieve(
names.id2nickname(f"{nickname}_eval_dataset", data_id),
)
evaluatereporter.report(names.id2nickname(nickname, data_id), dataset)
# copy over clean segments
clean_segs = train_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", data_id)
)
evaluate_cleansegsreporter.report(
names.id2nickname("cleanSegments", data_id), clean_segs
)
else:
data_id = model.model_id
# search for bootstrap calibration map if needed, otherwise,
# generate new calibration map if a new model was trained or
# an existing calibration map doesn't exist for the identifier
id_nickname = names.id2nickname(nickname, data_id)
if bootstrap:
bootstrap_map_path = config.calibrate.get("bootstrap_map")
if not bootstrap_map_path:
raise ValueError(
"if a bootstrap model is used, we also "
"require a corresponding calibration map"
)
logger.info("bootstrapping analysis with pre-trained calibration")
calibmap = calibratereporter.read(bootstrap_map_path)
# verify that model IDs match up
if data_id != calibmap.model_id:
raise ValueError(
"bootstrap model and calibration map require matching IDs; "
f"model ID: {data_id}, calibration map ID: {calibmap.model_id}"
)
# add to generic preferred lists for stream data discovery
trainreporter.report(nickname, model, preferred=True)
calibratereporter.report(
names.id2nickname(nickname, data_id), calibmap, preferred=True
)
elif new_model or not calibratereporter.preferred(id_nickname):
logger.info(
"preferred calibration map not available, "
"launching calibration job over [%.3f, %.3f]"
% (gps_start - initial_lookback, gps_start)
)
try:
batch.calibrate(
gps_start - initial_lookback,
gps_start,
config_path,
data_id=data_id,
preferred=True,
use_training_set=False,
)
except Exception:
logger.error("initial calibrate job failed!")
raise
# for processes based on workflow
if workflow == "block":
raise ValueError(
"stream cannot manage processes via workflow=block because it "
"must launch several processes simultaneously"
)
elif workflow == "fork":
logger.info("launching asynchronous tasks in parallel via multiprocessing")
args = (config_path,) # set up common arguments
kwargs = {"gps_start": gps_start, "gps_end": gps_end}
workers = []
try:
jobs = [
(train, "train"),
(evaluate, "evaluate"),
(calibrate, "calibrate"),
(timeseries, "timeseries"),
]
if not skip_report:
jobs.append((report, "report"))
for job, name in jobs:
logger.info("forking %s" % name)
proc = mp.Process(target=job, name=name, args=args, kwargs=kwargs)
proc.start()
workers.append(proc)
if persist:
# stick around and watch forked procs if requested
num_proc = len(workers)
# hard coding delay here in case DEFAULT_DELAY is changed...
manager = utils.CadenceManager(
gpstime.gpsnow(), stride=monitoring_cadence, delay=0
)
while num_proc:
logger.info("monitoring %d processes" % num_proc)
for _ in range(num_proc):
proc = workers.pop(0)
if proc.is_alive(): # process is still running
workers.append(proc)
else:
# make sure we send SIGKILL, which this should do
proc.join()
logger.info(
"%s terminated with exitcode=%d"
% (proc.name, proc.exitcode)
)
if proc.exitcode:
raise RuntimeError(
"%s terminated with exitcode=%d"
% (proc.name, proc.exitcode)
)
num_proc = len(workers)
manager.wait() # wait so we don't spam the OS
except Exception as e:
while workers:
proc = workers.pop()
proc.terminate()
raise e
elif workflow == "condor":
logger.info("launching asynchronous tasks in parallel via condor")
dag_path = condor.create_stream_dag(
config,
config_path,
names.tag2streamdir(tag, rootdir=rootdir),
initial_lookback=initial_lookback,
skip_timeseries=skip_timeseries,
skip_report=skip_report,
verbose=verbose,
quiet=quiet,
**config.condor,
)
logger.info("dag written to {}".format(dag_path))
exitcode = condor.submit_dag(dag_path, block=False)
if exitcode:
raise RuntimeError("non-zero returncode for dag {}".format(dag_path))
if persist:
raise NotImplementedError(
"we currently cannot monitor condor for completion of streaming jobs..."
)
else:
raise ValueError("workflow=%s not understood" % workflow)
[docs]def train(
config_path,
gps_start=None,
gps_end=None,
initial_lookback=DEFAULT_INITIAL_LOOKBACK,
):
"""
run the training process
"""
# set up gps ranges
gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end)
# read in config
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
dataloader_factory = factories.DataLoaderFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(config.rootdir)
traindir = names.tag2traindir(tag, rootdir) # used to store results
for directory in [rootdir, traindir]:
os.makedirs(directory, exist_ok=True)
target_channel = config.samples["target_channel"]
target_bounds = configparser.config2bounds(config.samples["target_bounds"])
dirty_bounds = configparser.config2bounds(config.samples["dirty_bounds"])
dirty_window = config.samples["dirty_window"]
random_rate = config.train["random_rate"]
# set up how we record results
# the boundaries for these will be re-set within the main loop, hence, set
# to zero initially
trainreporter = reporter_factory(
traindir,
0,
0,
group=names.tag2group(tag, "train"),
**config.train["reporting"],
)
# other reporters used to record intermediate data products
segmentreporter = reporter_factory(traindir, gps_start, gps_end, flavor="segment")
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
assert (
len(config.classifiers) == 1
), "multiple classifiers are not allowed to be run for stream.train"
[nickname] = config.classifiers
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
classifier = classifier_factory(
nickname, rootdir=traindir, generate_id=True, **items
)
is_incremental = isinstance(classifier, classifiers.IncrementalSupervisedClassifier)
# figure out how we'll read data
items = config.features
logger.debug(
"dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
items = configparser.add_missing_kwargs(items, group=names.tag2group(tag, "train"))
time = items.pop("time") # extract the name of the column we use as time
safe_columns = items.get("safe_columns", items["columns"])
sitems = config.train["stream"]
logger.debug(
"stream_processor -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items())
)
# initialize stream object
stream = StreamProcessor(
gps_start,
gps_end,
dataloader_factory,
buffer_kwargs=items,
**sitems,
)
# configure segdb if required
if config.train.get("ignore_segdb", False):
logger.info("ignoring segdb")
else:
stream.configure_segdb(
config.segments,
reporter=segmentreporter,
)
# set up initial data query
segs = stream.query_segdb(gps_start - initial_lookback, gps_start)
dataloader = dataloader_factory(
gps_start - initial_lookback, gps_start, segs=segs, **items
)
logger.info(
"initial data query: [%.3f, %.3f)" % (gps_start - initial_lookback, gps_start)
)
data = dataloader.query()
# get target times and draw from random times
new_target_times = dataloader.target_times(
time, target_channel, target_bounds, segs=segs
)
logger.info("identified %d target_times" % len(new_target_times))
new_random_times, new_clean_segs = dataloader.random_times(
time, target_channel, dirty_bounds, dirty_window, random_rate, segs=segs
)
logger.info("identified %d random times" % len(new_random_times))
accumulated_target_times = len(new_target_times)
accumulated_random_times = len(new_random_times)
accumulated_clean_segs = new_clean_segs
# possibly trim down columns after identifying times
# to reduce memory footprint
data.filter(columns=safe_columns)
# make a new dataset and add to current dataset
dataset = factories.DatasetFactory(data).labeled(new_target_times, new_random_times)
# we'll set the start, end times within the main loop
datasetreporter = reporter_factory(traindir, 0, 0, flavor="dataset")
if is_incremental:
# load in the preferred models so incremental stuff picks up where it left off
classifier.model = trainreporter.retrieve(classifier.nickname, preferred=True)
# actually run jobs
min_new_samples = config.train["stream"].get(
"min_new_samples", DEFAULT_MIN_NEW_SAMPLES
)
max_samples = config.train["stream"].get("max_samples", DEFAULT_MAX_SAMPLES)
max_stride = config.train["stream"].get("max_stride", DEFAULT_MAX_STRIDE)
# start up streaming pipeline
logger.info("starting streaming training")
while stream.timestamp < gps_end:
logger.info("--- train stride: [%.3f, %.3f) ---" % stream.seg)
# poll for new data
new_data, segs = stream.poll()
timestamp = segs[0][0] if len(segs) else stream.timestamp
lvtm = utils.livetime(segs & new_data.segs) if new_data else 0
logger.info("acquired %.3f sec of data at %d" % (lvtm, timestamp))
# if there is new samples, create a new umbrella and add to big umbrella
if lvtm:
# get target times and draw from random times
try:
new_target_times = new_data.target_times(
time, target_channel, target_bounds, segs=segs
)
except KeyError:
logger.warning("target channel missing, moving to next stride")
continue
else:
accumulated_target_times += len(new_target_times)
logger.info(
"identified %d new target times for a total of %d target times"
% (len(new_target_times), accumulated_target_times)
)
new_random_times, new_clean_segs = new_data.random_times(
time, target_channel, dirty_bounds, dirty_window, random_rate, segs=segs
)
accumulated_random_times += len(new_random_times)
accumulated_clean_segs |= new_clean_segs
logger.info(
"identified %d new random times for a total of %d random times"
% (len(new_random_times), accumulated_random_times)
)
# possibly trim down columns after identifying times
# to reduce memory footprint
new_data.filter(columns=safe_columns)
# make a new dataset and add to big dataset
new_dataset = factories.DatasetFactory(new_data).labeled(
new_target_times, new_random_times
)
dataset += new_dataset
# after generating new times and datasets, filter by segments before
# combining data this prevents repeated samples from being added in
# if/when we add padding
new_data.filter(segs, time=time)
data += new_data
# launch a new training job if there is enough data
if accumulated_target_times + accumulated_random_times < min_new_samples:
logger.info(
"%d accumulated times < %d, waiting to launch new training jobs"
% (
accumulated_target_times + accumulated_random_times,
min_new_samples,
)
)
else: # launch a new training job if there is enough data
logger.info(
"%d accumulated times >= %d, launching new training jobs"
% (
accumulated_target_times + accumulated_random_times,
min_new_samples,
)
)
# create a new dataset from the training data, times, and labels
dataset = features.Dataset.from_datachunks(
data, times=dataset.times, labels=dataset.labels
)
# filter dataset by livetime and number of samples
# also trim underlying data source by livetime
if data.livetime > max_stride:
logger.info(
"filtering dataset to reduce the total livetime to %.3f sec"
% max_stride
)
if len(dataset.times) > max_samples:
logger.info(
"filtering dataset to reduce number of target times to <= %d"
% max_samples
)
data.flush(max_stride=max_stride, time=time)
dataset.flush(max_stride=max_stride, max_samples=max_samples)
logger.info(
"launching training job for [%.3f, %.3f) (new [%.3f, %.3f))"
% (dataset.start, dataset.end, segs[0][0], segs[-1][1])
)
# set timestamp for reporter
trainreporter.start = dataset.start
trainreporter.end = dataset.end
datasetreporter.start = dataset.start
datasetreporter.end = dataset.end
if is_incremental:
path = datasetreporter.report("newdataset", new_dataset)
logger.debug("new_dataset written to " + path)
else:
path = datasetreporter.report("dataset", dataset)
logger.debug("dataset written to " + path)
logger.info("training classifier")
if is_incremental:
ntarget = accumulated_target_times
nrandom = accumulated_random_times
logger.info(
"training %s with new_dataset "
"(%d target_times, %d random_times)"
% (nickname, ntarget, nrandom)
)
model = classifier.train(new_dataset)
else:
ntarget, nrandom = dataset.vectors2num()
logger.info(
"training %s with dataset "
"(%d target_times, %d random_times)"
% (nickname, ntarget, nrandom)
)
model = classifier.train(dataset)
# record clean segments
segmentreporter.start = dataset.start
segmentreporter.end = dataset.end
path = segmentreporter.report(
names.id2nickname("cleanSegments", model.model_id),
accumulated_clean_segs,
)
logger.debug("clean segments written to " + path)
# record dataset with ranks
logger.info("generating evaluated dataset")
if is_incremental:
eval_dataset = classifier.evaluate(new_dataset)
else:
eval_dataset = classifier.evaluate(dataset)
path = datasetreporter.report(
names.id2nickname(f"{nickname}_eval_dataset", model.model_id),
eval_dataset,
)
logger.debug(
"evaluated dataset for %s written to %s" % (nickname, path)
)
# record new model. we do this last so that downstream jobs
# don't run into issues accessing data products that haven't
# been generated yet
trainreporter.start, trainreporter.end = (
model.start,
model.end,
)
logger.info("reporting model")
path = trainreporter.report(nickname, model, preferred=True)
logger.debug("model for %s written to %s" % (nickname, path))
# filter underlying data by bounds after training
# to reduce memory footprint
data.filter(bounds=model.selector.bounds, time=time)
# re-set target and random time counters
accumulated_target_times = 0
accumulated_random_times = 0
# recreate the dataset without any transformations applied
dataset = features.Dataset.from_datachunks(
data, times=dataset.times, labels=dataset.labels
)
else:
logger.info("no identified times, skipping training")
# repeat at processing cadence
stream.flush(retain=0) # FIXME: worry about padding
[docs]def evaluate(config_path, gps_start=None, gps_end=None):
"""
the realtime evaluation routine
"""
# set up gps ranges
gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end)
# read in config
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
dataloader_factory = factories.DataLoaderFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(config.rootdir)
evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir)
traindir = names.tag2traindir(tag, rootdir=rootdir)
for directory in [rootdir, traindir, evaluatedir]:
os.makedirs(directory, exist_ok=True)
# get info for building datasets
target_channel = config.samples["target_channel"]
target_bounds = configparser.config2bounds(config.samples["target_bounds"])
dirty_bounds = configparser.config2bounds(config.samples["dirty_bounds"])
dirty_window = config.samples["dirty_window"]
random_rate = config.evaluate["random_rate"]
# set up how we record results
# the boundaries for these will be re-set within the main loop, hence, set
# to zero initially
evaluatereporter = reporter_factory(
evaluatedir,
0,
0,
group=names.tag2group(tag, "evaluate"),
**config.evaluate["reporting"],
)
trainreporter = reporter_factory(
traindir,
0,
0,
group=names.tag2group(tag, "evaluate"),
**config.train["reporting"],
)
# other reporters used to record intermediate data products
segmentreporter = reporter_factory(
evaluatedir, gps_start, gps_end, flavor="segment"
)
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
assert (
len(config.classifiers) == 1
), "multiple classifiers are not allowed to be run for stream.evaluate"
[nickname] = config.classifiers
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
classifier = classifier_factory(
nickname, rootdir=evaluatedir, genetate_id=True, **items
)
# figure out how we'll read data
items = config.features
logger.debug(
"dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
items = configparser.add_missing_kwargs(
items, group=names.tag2group(tag, "evaluate")
)
time = items.pop("time") # extract the name of the column we use as time
sitems = config.evaluate["stream"]
logger.debug(
"stream_processor -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items())
)
# initialize stream object
stream = StreamProcessor(
gps_start,
gps_end,
dataloader_factory,
buffer_kwargs=items,
**sitems,
)
# configure segdb if required
if config.evaluate.get("ignore_segdb", False):
logger.info("ignoring segdb")
else:
stream.configure_segdb(
config.segments,
reporter=segmentreporter,
)
# actually run jobs
max_samples = config.evaluate.get("max_samples", DEFAULT_MAX_SAMPLES)
model_id = None
# start up streaming pipeline
logger.info("starting streaming evaluation")
while stream.timestamp < gps_end:
logger.info("--- evaluate stride: [%.3f, %.3f) ---" % stream.seg)
# retrieve trained models
# get the next model available
model = trainreporter.retrieve(nickname, preferred=True)
# assumes we poll more frequently than models are produced
# update if there is a new model
if model is not None:
logger.info("updating model for " + nickname)
classifier.model = model
model_id = model.model_id
# poll for new data
new_data, segs = stream.poll()
timestamp = segs[0][0] if len(segs) else stream.timestamp
lvtm = utils.livetime(segs & new_data.segs) if new_data else 0
logger.info("acquired %.3f sec of data at %d" % (lvtm, timestamp))
# if there is new samples, create a new umbrella
if lvtm:
# NOTE: we only condition on whether there is something new to do
# and don't care about min_new_samples, max_samples, max_stride we
# just always evaluate everything
evaluate_start = segs[0][0]
evaluate_end = segs[-1][1]
# get target times and draw from random times
try:
target_times = new_data.target_times(
time, target_channel, target_bounds, segs=segs
)
except KeyError:
logger.warning("target channel missing, moving to next stride")
continue
else:
logger.info("identified %d target times" % len(target_times))
if len(target_times) > max_samples:
logger.info("retaining %d target times" % max_samples)
target_times = target_times[-max_samples:]
random_times, clean_segs = new_data.random_times(
time, target_channel, dirty_bounds, dirty_window, random_rate, segs=segs
)
logger.info("identified %d random times" % len(random_times))
if len(random_times) > max_samples:
logger.info("retaining %d random times" % max_samples)
random_times = random_times[-max_samples:]
# evaluate classifiers if there are times in which to do evaluation
if len(target_times) > 0 or len(random_times) > 0:
# build a dataset
dataset = factories.DatasetFactory(new_data).labeled(
target_times, random_times
)
# update reporter timestamps
evaluatereporter.start = evaluate_start
evaluatereporter.end = evaluate_end
logger.info(
"launching evaluation job for [%.3f, %.3f)"
% (evaluate_start, evaluate_end)
)
# record clean segments
segmentreporter.start = evaluate_start
segmentreporter.end = evaluate_end
path = segmentreporter.report(
names.id2nickname("cleanSegments", model_id),
clean_segs,
preferred=True,
)
logger.debug("clean segments written to " + path)
logger.info("evaluating dataset for %s" % nickname)
dataset = classifier.evaluate(dataset)
# manage provenance
evaluatereporter.start, evaluatereporter.end = (
dataset.start,
dataset.end,
)
logger.info("reporting dataset")
dataset_name = names.id2nickname(nickname, model_id)
path = evaluatereporter.report(dataset_name, dataset, preferred=True)
logger.debug("dataset written to %s" % path)
else:
logger.info("no identified times, skipping evaluation")
# repeat at processing cadence
stream.flush(retain=0) # FIXME: worry about padding
[docs]def calibrate(config_path, gps_start=None, gps_end=None):
"""
calibrate the classifier predictions based on historical data
"""
# set up gps ranges
gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end)
# read in config
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(config.rootdir)
traindir = names.tag2traindir(tag, rootdir=rootdir)
evaluatedir = names.tag2evaluatedir(tag, rootdir=rootdir)
calibratedir = names.tag2calibratedir(tag, rootdir=rootdir)
for directory in [rootdir, traindir, evaluatedir, calibratedir]:
os.makedirs(directory, exist_ok=True)
# set up how we record results
# the boundaries for these will be re-set within the main loop, hence, set
# to zero initially
trainreporter = reporter_factory(
traindir,
0,
0,
group=names.tag2group(tag, "calibrate"),
**config.train["reporting"],
)
evaluatereporter = reporter_factory(
evaluatedir,
0,
0,
group=names.tag2group(tag, "calibrate"),
**config.evaluate["reporting"],
)
calibratereporter = reporter_factory(
calibratedir,
0,
0,
group=names.tag2group(tag, "calibrate"),
**config.calibrate["reporting"],
)
# other reporters used to record intermediate data products
cleansegsreporter = reporter_factory(
evaluatedir, gps_start, gps_end, flavor="segment"
)
train_cleansegsreporter = reporter_factory(
traindir, gps_start, gps_end, flavor="segment"
)
train_datasetreporter = reporter_factory(traindir, 0, 0, flavor="dataset")
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
assert (
len(config.classifiers) == 1
), "multiple classifiers are not allowed to be run for stream.calibrate"
[nickname] = config.classifiers
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
classifier = classifier_factory(nickname, rootdir=calibratedir, **items)
# extract basic parameters for calibration
calibrate_kwargs = config.calibrate
min_new_samples = calibrate_kwargs.pop("min_new_samples", DEFAULT_MIN_NEW_SAMPLES)
# set up cadence manager
sitems = config.calibrate["stream"]
logger.debug(
"cadence_manager -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items())
)
assert "max_timestamp" not in sitems, (
"max_timestamp is set by hand to gps_end and "
"cannot be passed through INI file via evaluate stream"
)
manager = utils.CadenceManager(gps_start, max_timestamp=gps_end, **sitems)
# get the next model available
model = trainreporter.retrieve(nickname, preferred=True)
model_id = model.model_id
data_id = names.id2nickname(nickname, model.model_id)
# load in existing calibration map if available
calibmap = calibratereporter.retrieve(data_id, preferred=True)
if calibmap:
classifier.calibration_map = calibmap
else:
# initial calibration with new model
train_cleansegsreporter.start = model.start
train_cleansegsreporter.end = model.end
initial_clean_segs = train_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", model.model_id)
)
train_datasetreporter.start = model.start
train_datasetreporter.end = model.end
train_dataset = train_datasetreporter.retrieve(
names.id2nickname(f"{nickname}_eval_dataset", model.model_id),
)
# recalibrate with initial training dataset
logger.info("initial calibration for {} with new model".format(nickname))
classifier.model = model
calibmap = classifier.calibrate(
train_dataset, clean_segs=initial_clean_segs, **calibrate_kwargs
)
logger.info("reporting calibration")
calibratereporter.start = model.start
calibratereporter.end = model.end
path = calibratereporter.report(data_id, calibmap, preferred=True)
logger.debug("calibration written to %s" % path)
# set up accumulated datasets as needed
# NOTE: we only need to do this here because everything else uses
# StreamProcessor to avoid reading in redundant samples, etc
dataset = features.Dataset(start=manager.timestamp, end=manager.timestamp)
# set up accumulated clean segments for calibration
clean_segs = segmentlist([])
# start up streaming pipeline
logger.info("starting streaming calibration")
while manager.timestamp < gps_end:
logger.info("--- calibrate stride: [%.3f, %.3f) ---" % manager.seg)
timestamp, _ = manager.wait() # manage cadence and sleep logic here
# get the next model available
# assumes we poll more frequently than models are produced
# update if there is a new model
model = trainreporter.retrieve(nickname, preferred=True)
# if new model available, flush calibration map and
# generate new calibration with datasets
if model is not None:
logger.info("updating model for " + nickname)
classifier.model = model
# data identifier
model_id = model.model_id
data_id = names.id2nickname(nickname, model.model_id)
# resetting calibration map
logger.info("resetting calibration map")
classifier.calibration_map = None
# load auxiliary data to trained model
train_cleansegsreporter.start = model.start
train_cleansegsreporter.end = model.end
initial_clean_segs = train_cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", model.model_id)
)
train_datasetreporter.start = model.start
train_datasetreporter.end = model.end
train_dataset = train_datasetreporter.retrieve(
names.id2nickname(f"{nickname}_eval_dataset", model.model_id),
)
# recalibrate with initial training dataset
logger.info("recalibrating calibration map with new model")
calibmap = classifier.calibrate(
train_dataset, clean_segs=initial_clean_segs, **calibrate_kwargs
)
logger.info("reporting calibration")
calibratereporter.start = model.start
calibratereporter.end = model.end
path = calibratereporter.report(data_id, calibmap, preferred=True)
logger.debug("calibration written to %s" % path)
# reset dataset
# take as my starting point the end of the initial dataset
start = dataset.end
dataset = features.Dataset(start=start, end=start)
logger.info("retrieving clean segments")
new_clean_segs = cleansegsreporter.retrieve(
names.id2nickname("cleanSegments", model_id), preferred=True
)
if new_clean_segs is None: # nothing new available
logger.info("no new clean segments for %s at %d" % (nickname, timestamp))
continue
# get the next dataset available
new_dataset = evaluatereporter.retrieve(data_id, preferred=True)
if new_dataset is None: # nothing new available
logger.info("no new dataset for %s at %d" % (nickname, timestamp))
continue
elif (len(new_dataset) == 0) or (new_dataset.end <= dataset.end):
logger.info("empty dataset for %s at %d" % (nickname, timestamp))
elif new_dataset.start >= dataset.end: # represents new data!
ngch, ncln = new_dataset.vectors2num()
logger.info(
"acquired %.3f sec of data for %s at %d (%d glitch, %d clean)"
% (
utils.livetime(new_dataset.segs),
nickname,
timestamp,
ngch,
ncln,
)
)
dataset += new_dataset
clean_segs |= new_clean_segs
elif new_dataset.end > dataset.end:
# at least some of the new_dataset is actually new
trunc = new_dataset.segs & segmentlist([(dataset.end, new_dataset.end)])
ngch, ncln = new_dataset.vectors2num()
logger.info(
"acquired %.3f sec of data for %s at %d (%d glitch, %d clean)"
% (utils.livetime(trunc), nickname, timestamp, ngch, ncln)
)
new_dataset.filter(segs=trunc)
dataset += new_dataset
clean_segs |= new_clean_segs
logger.info("calibrating classifiers")
if len(dataset) >= min_new_samples:
# need to spawn a calibration job for this classifier
calibratereporter.start = dataset.start
calibratereporter.end = dataset.end
# limit clean segments to those used by calibration
calib_clean_segs = clean_segs & dataset.segs
# evaluate dataset and save probabilities to disk
ngch, ncln = dataset.vectors2num()
logger.info("calibrating %s (%d glitch, %d clean)" % (nickname, ngch, ncln))
# this works incrementally by default
calibmap = classifier.calibrate(
dataset, clean_segs=calib_clean_segs, **calibrate_kwargs
)
# manage provenance
calibratereporter.start, calibratereporter.end = (
calibmap.start,
calibmap.end,
)
logger.info("reporting calibration")
path = calibratereporter.report(data_id, calibmap, preferred=True)
logger.info("calibration written to %s" % path)
# re-set dataset
# take as my starting point where the old dataset left off
start = dataset.end
dataset = features.Dataset(start=start, end=start)
else:
logger.info(
"not enough new data to re-calibrate %s "
"(%d samples; required at least %d)"
% (nickname, len(dataset), min_new_samples)
)
[docs]def timeseries(config_path, gps_start=None, gps_end=None):
"""
the realtime evaluation routine
"""
# set up gps ranges
gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end)
# read in config
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# define factories to generate objects
reporter_factory = factories.ReporterFactory()
classifier_factory = factories.ClassifierFactory()
dataloader_factory = factories.DataLoaderFactory()
# extract common parameters
tag = config.tag
rootdir = os.path.abspath(config.rootdir)
timeseriesdir = names.tag2timeseriesdir(tag, rootdir=rootdir)
calibratedir = names.tag2calibratedir(tag, rootdir=rootdir)
traindir = names.tag2traindir(tag, rootdir=rootdir)
for directory in [rootdir, traindir, calibratedir, timeseriesdir]:
os.makedirs(directory, exist_ok=True)
# extract information necessary for channel names in timeseries files
instrument = config.instrument
description = config.timeseries.get("description")
target_bounds = configparser.config2bounds(config.samples["target_bounds"])
frequency = config.features["frequency"]
assert frequency in target_bounds, (
"must specify a frequency range (called %s) within target_bounds" % frequency
)
freq_min, freq_max = target_bounds[frequency]
assert isinstance(freq_min, int) and isinstance(
freq_max, int
), "frequency bounds must be integers!"
set_ok = config.timeseries.get("set_ok")
# set up aggregator for metric collection
if config.monitor:
logger.info("setting up metric collection")
kafka_client = kafka.Client(
"kafka://{}".format(config.features["timeseries"]["url"])
)
# register schema for desired metrics
# FIXME should we be getting these from scald config?
kafka_client.subscribe(
[
f"idq.{config.tag}.{metric}"
for metric in ["fap", "loglike", "rank", "latency", "ok"]
]
)
# set up how we record results
# the boundaries for these will be re-set within the main loop, hence, set
# to zero initially
trainreporter = reporter_factory(
traindir,
0,
0,
group=names.tag2group(tag, "timeseries"),
**config.train["reporting"],
)
calibratereporter = reporter_factory(
calibratedir,
0,
0,
group=names.tag2group(tag, "timeseries"),
**config.calibrate["reporting"],
)
timeseriesreporter = reporter_factory(
timeseriesdir,
0,
0,
group=names.tag2group(tag, "timeseries"),
**config.timeseries["reporting"],
)
# other reporters used to record intermediate data products
segmentreporter = reporter_factory(
timeseriesdir, gps_start, gps_end, flavor="segment"
)
# set a random seed if provided
seed = config.samples.get("random_seed", None)
if seed is not None:
logger.info("setting random seed: %d" % seed)
np.random.seed(seed)
# figure out which classifiers we'll run
assert (
len(config.classifiers) == 1
), "multiple classifiers are not allowed to be run for stream.timeseries"
[nickname] = config.classifiers
items = config.classifier_map[nickname]
logger.debug(
nickname + " -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
classifier = classifier_factory(nickname, rootdir=timeseriesdir, **items)
# figure out how we'll read data
if "timeseries" in config.features:
items = config.features["timeseries"]
else:
items = config.features
logger.debug(
"dataloader -> " + " ".join("%s:%s" % (k, v) for k, v in items.items())
)
items = configparser.add_missing_kwargs(
items, group=names.tag2group(tag, "timeseries")
)
# figure out timeseres sampling rate -> dt
srate = config.timeseries["srate"]
logger.info("generating timeseries sampled at %.3f Hz" % srate)
dt = 1.0 / srate
# initialize stream processor
sitems = config.timeseries["stream"]
logger.debug(
"stream_processor -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items())
)
stream = StreamProcessor(
gps_start,
gps_end,
dataloader_factory,
buffer_kwargs=items,
**sitems,
)
# configure segdb if required
if config.timeseries.get("ignore_segdb", False):
logger.info("ignoring segdb")
else:
stream.configure_segdb(
config.segments,
reporter=segmentreporter,
)
# cache to sync up data products with the same ID
new_model = None
model_id = None
new_id = False
# start up streaming pipeline
logger.info("starting streaming timeseries")
while stream.timestamp < gps_end:
logger.info("--- timeseries stride: [%.3f, %.3f) ---" % stream.seg)
# update internals for the classifier
# retrieve trained model
# get the next model available
model = trainreporter.retrieve(nickname, preferred=True)
# assumes we poll faster than models are produced
# update if there is a new model
if model is not None:
new_id = True
model_id = model.model_id
new_model = model
# retrieve calibration map
# get the next map available
data_id = names.id2nickname(nickname, model_id)
calibration_map = calibratereporter.retrieve(data_id, preferred=True)
# assumes we poll faster than maps are produced
# update if there is a new calibration map
if new_id and calibration_map:
logger.info("updating model and calibration map for " + nickname)
classifier.model = new_model
classifier.calibration_map = calibration_map
new_id = False
elif calibration_map:
logger.info("updating calibration map for " + nickname)
classifier.calibration_map = calibration_map
elif new_id and not classifier.is_trained:
# new model available without calibration map, but
# no previous model exists. need to wait for a new map
logger.warning(
"initial model found without calibration map, "
"updating model and skipping stride"
)
classifier.model = new_model
continue
elif new_id:
# no new calibration map yet for new model, see if there
# is a new calibration map for the existing model and
# update that instead if needed
data_id = names.id2nickname(nickname, classifier.model.model_id)
calibration_map = calibratereporter.retrieve(data_id, preferred=True)
if calibration_map:
logger.info("updating calibration map for " + nickname)
classifier.calibration_map = calibration_map
if calibration_map:
# report on calibration map health when a new one is available
if calibration_map.is_healthy():
logger.info(
"calibration map for id: %s is healthy, time: %.3f",
classifier.calibration_map.model_id,
stream.seg[0],
)
else:
logger.warning(
"calibration map for id: %s is not healthy, time: %.3f",
classifier.calibration_map.model_id,
stream.seg[0],
)
# poll for new data
# NOTE: we very intentionally to not "fix_segments"
# keeping them partitioned this way tells us which frames to write
# so that each frame has the same (predictable) duration: stream.stride
new_data, segs = stream.poll()
timestamp = segs[0][0] if len(segs) else stream.timestamp
lvtm = utils.livetime(segs & new_data.segs) if new_data else 0
logger.info("acquired %.3f sec of data at %d" % (lvtm, timestamp))
# handle case where timeseries job just started but doesn't
# have all necessary data products available
if classifier.is_trained and not classifier.is_calibrated:
logger.warning(
"no calibration map found for initial model yet, skipping stride"
)
continue
# create timeseries if there is new data
if lvtm:
dataset_factory = factories.DatasetFactory(new_data)
for seg in segs:
# process a single segment at a time to guarantee uniform duration
seglist = segmentlist([seg])
# update report times
timeseriesreporter.start, timeseriesreporter.end = seg
# generate series
logger.info("generating timeseries")
info = ChannelInfo(
instrument, freq_min, freq_max, nickname, description=description
)
series = classifier.timeseries(
info,
dataset_factory,
dt=dt,
segs=seglist,
set_ok=set_ok,
)
# report timeseries
logger.debug("reporting timeseries")
for path in batch.report_timeseries(timeseriesreporter, info, series):
logger.info("timeseries written to %s" % path)
# record latency
time = float(seg[0])
latency = utils.gps2latency(time)
logger.info("latency upon writing timeseries: %.3f" % latency)
# report metrics
if config.monitor:
# format metrics
for s in series:
# find location of highest logL point w/in series
max_ind = np.argmax(s.loglike.series)
columns = {
"fap": s.fap.series[max_ind],
"loglike": s.loglike.series[max_ind],
"rank": s.rank.series[max_ind],
"latency": latency,
"ok": np.min(s.ok.series),
}
# store and aggregate metrics
logger.debug("reporting metrics")
for metric, column in columns.items():
kafka_client.write(
f"idq.{config.tag}.{metric}",
{"time": [time], "data": [column]},
)
else:
logger.info("no identified times, skipping timeseries")
# repeat at processing cadence
stream.flush(retain=0) # FIXME: worry about padding
[docs]def report(config_path, gps_start=None, gps_end=None, reportdir=None):
"""run batch.report jobs periodically"""
# set up gps ranges
gps_start, gps_end = gps_range(gps_start=gps_start, gps_end=gps_end)
# read in config
logger.info("using config : %s" % config_path)
config_path = os.path.abspath(os.path.realpath(config_path))
config = configparser.path2config(config_path)
# set up cadence manager
# support a running average if specified
lookback = config.report.get("lookback", 0)
sitems = config.report["stream"]
logger.debug(
"cadence_manager -> " + " ".join("%s:%s" % (k, v) for k, v in sitems.items())
)
assert (
"max_timestamp" not in sitems
), "max_timestamp is set by hand to gps_end and cannot be passed through config"
manager = utils.CadenceManager(gps_start, max_timestamp=gps_end, **sitems)
# actually loop
logger.info("starting streaming reporting")
while manager.timestamp < gps_end:
logger.info("--- report stride: [%.3f, %.3f) ---" % manager.seg)
start, end = manager.wait() # manage cadence and sleep logic here
# we allow users to generate a running-average with non-trivial
# overlap via "lookback"
batch.report(start - lookback, end, config_path, reportdir=reportdir)
[docs]class StreamProcessor(object):
"""
handles data processing logic for the streaming pipeline
"""
# define a few string templates for things that will be printed repeated
_poll_template = "querying for triggers within [%.4f, %.4f)"
_retained_template = "retained %.4f sec of livetime"
_ignore_template = "ignoring segdb"
_yesdata_template = "found data at %.4f"
_nodata_template = "no data found within [%.4f, %.4f)"
_incontiguous_template = "gap in data found [%.4f, %.4f). Moving to %.4f"
_latency_template = (
"too far behind realtime at (%.4f sec at %.4f), skipping ahead to %.4f"
)
_newbuffer_template = "creating new buffer for [%.4f, %.4f)"
def __init__(self, start, end, buffer_factory, buffer_kwargs={}, **manager_kwargs):
# this is kinda janky, but it will work...
self._init_segdb()
self._buffer_factory = buffer_factory
# set up cadence manager
assert "max_timestamp" not in manager_kwargs, (
"max_timestamp is set to end by hand, "
"so it cannot be passed via manager_kwargs!"
)
self._manager = utils.CadenceManager(
start, max_timestamp=end, logger=logger, **manager_kwargs
)
# do this next little backflip to make sure all options are passed
# correctly
self._buffer_kwargs = buffer_kwargs
self._buffer_kwargs.update(
self._buffer_factory(start, end, **buffer_kwargs).kwargs
)
self._buffer = None
# set up the buffer store
self._queue = deque()
def _init_segdb(self):
"""
set up all the attributes needed for segdb stuff
these will all be overwritten upon a call to configure_segdb
"""
self._ignore_segdb = True # default behavior
self._segdb_url = None
self._segdb_intersect = None
self._segdb_exclude = None
self._segdb_reporter = None
@property
def ignore_segdb(self):
return self._ignore_segdb
@property
def stride(self):
return self._manager.stride
@property
def delay(self):
return self._manager.delay
@property
def timestamp(self):
return self._manager.timestamp
@property
def seg(self):
return self._manager.seg
@property
def segs(self):
return self._manager.segs
@property
def segdb_url(self):
return self._segdb_url
@property
def segdb_intersect(self):
return self._segdb_intersect
@property
def segdb_exclude(self):
return self._segdb_exclude
@property
def segdb_reporter(self):
return self._segdb_reporter
[docs] def query_segdb(self, start, end):
"""
a helper function that manages segment construction for each buffer
"""
if self.ignore_segdb:
logger.debug(self._ignore_template)
segs = segmentlist([segment(start, end)])
else:
query_start = start - self._segdb_offset
query_end = end - self._segdb_offset
logger.info(self._segdb_template % (query_start, query_end))
segs = utils.segdb2segs(
query_start,
query_end,
union=self._segdb_union,
intersect=self._segdb_intersect,
exclude=self._segdb_exclude,
segdb_url=self._segdb_url,
)
logger.info(self._retained_template % utils.livetime(segs))
if self._segdb_offset:
segs.shift(self._segdb_offset)
if self._segdb_reporter is not None:
self._segdb_reporter.start = start
self._segdb_reporter.end = end
path = self._segdb_reporter.report("segments", segs)
logger.info("segments written to " + path)
return segs
def new_buffer(self, start):
end = start + self.stride
logger.debug(self._newbuffer_template % (start, end))
if self._buffer:
self._buffer_kwargs.update(self._buffer.kwargs)
return self._buffer_factory(
start, end, segs=self.query_segdb(start, end), **self._buffer_kwargs
)
def __len__(self):
return len(self._queue)
def flush(self, retain=0):
# clear out all but the most recently added buffer, which is done
# so new samples can access older data to create feature vectors
while len(self) > retain:
self._queue.popleft()
[docs] def wait(self):
"""
delegates to CadenceManager.wait(), which updates timestamp after sleeping
"""
return self._manager.wait()
[docs] def poll(self, **triggers_kwargs):
"""
the workhorse method manages requests for data and I/O via
ClassifierData objects constructs and returns an umbrella of the
requested ClassifierDatas for this stride
triggers_kwargs is passed to calls to ClassifierData.triggers and is
exposed to help control I/O costs
"""
segs = segmentlist([])
try:
for seg in self._manager.poll():
# manager.poll will gobble up multiple segments if needed
# we wait to create this until after self.wait() because queries
# to segdb
self._buffer = self.new_buffer(seg[0])
# within new_buffer() must be causal
if utils.livetime(self._buffer.segs):
# there is non-trivial livetime in the new buffer
logger.debug(
self._poll_template % (self._buffer.start, self._buffer.end)
)
try: # try to retrieve data
data = self._buffer.query(**triggers_kwargs)
except exceptions.IncontiguousDataError as e:
# span of data doesn't match span of classifier data
logger.warning(
self._incontiguous_template
% (e.gap_start, e.gap_end, e.timestamp)
)
self._manager.timestamp = e.timestamp
except exceptions.NoDataError as e:
# no data found when querying
logger.warning(
self._nodata_template
% (e.timestamp, e.timestamp + e.stride)
)
else:
logger.debug(self._yesdata_template % seg[0])
self._queue.append(data)
segs.append(seg)
except exceptions.LatencyError as e:
logger.warning(
self._latency_template % (e.latency, e.current_timestamp, e.timestamp)
)
self._manager.timestamp = e.timestamp
# return children of umbrella corresponding to new times
# FIXME: worry about padding and edge effects for feature vectors?
return (
features.combine_chunks(list(self._queue)),
segs,
)