Source code for idq.utils

from collections import defaultdict
import functools
import glob
import itertools
import os
import random
import string
import time as TIME
import timeit

import numpy as np

import gpstime
from lal.utils import CacheEntry
from ligo.segments import segment, segmentlist
from dqsegdb2.query import query_segments

from . import exceptions


# cadence management
DEFAULT_STRIDE = 1.0
DEFAULT_DELAY = 0.0

# the maximum number of iterations in CadenceManager.yield before moving on
DEFAULT_MAX_ITERS = np.infty
# the maximum latency we allow before skipping ahead
DEFAULT_MAX_LATENCY = np.infty
DEFAULT_MAX_TIMESTAMP = np.infty

DEFAULT_MONITORING_CADENCE = 1.0

# I/O ascii conventions
ASCII_COMMENTS = ["#"]

# I/O columns conventions
KW_COLUMNS = [
    "start_time",
    "stop_time",
    "time",
    "frequency",
    "unnormalized_energy",
    "normalized_energy",
    "chisqdof",
    "significance",
]
KW_TIMECOL = "time"
KW_SIGCOL = "significance"
KW_COL2IND = dict((col, i) for i, col in enumerate(KW_COLUMNS))
KW_TIMEIND = KW_COL2IND[KW_TIMECOL]

KW_TEMPLATE = "%.6f %.6f %.6f %d %.3e %.2f %d %.2f"

SNAX_COLUMNS = ["time", "frequency", "q", "snr", "phase", "duration"]
SNAX_TIMECOL = "time"
SNAX_SIGCOL = "snr"

OMICRON_COLUMNS = [
    "time",
    "frequency",
    "tstart",
    "tend",
    "fstart",
    "fend",
    "snr",
    "q",
    "amplitude",
    "phase",
]
OMICRON_TIMECOL = "time"
OMICRON_SIGCOL = "snr"

# segdb conventions
DEFAULT_SEGDB_URL = os.environ.get(
    "DEFAULT_SEGMENT_SERVER", "https://segments.ligo.org"
)


[docs]def parse_variable_args(args, max_args, default=None): """ fill in unspecified variable arguments with a default value. """ return args + [default for _ in range(max_args - len(args))]
[docs]class CadenceManager(object): """ manages the cadence of our queries, including delay and timeout logic """ # define a few string templates that may be printed repeatedly _sleep_template = "sleeping for %.3f sec" def __init__( self, timestamp=None, stride=DEFAULT_STRIDE, delay=DEFAULT_DELAY, logger=None, enforce_integer_strides=True, offset=0, max_iters=DEFAULT_MAX_ITERS, max_latency=DEFAULT_MAX_LATENCY, max_timestamp=DEFAULT_MAX_TIMESTAMP, **extra_kwargs ): # start the timestamp off at an integer number of strides self._enforce_integer_strides = enforce_integer_strides self._stride = stride self._offset = offset if timestamp is None: timestamp = gpstime.gpsnow() # timestamp setter depends on _stride, _enforce_integer_strides, _offset self.timestamp = timestamp self._delay = delay # timestamp tracks the start of the stride, so we need to wait this long self._wait_duration = stride + delay self.max_timestamp = max_timestamp assert max_iters > 0, "max_iters must be non-zero" self.max_iters = max_iters assert max_latency > self._stride, "max_latency must be larger than stride" self.max_latency = max_latency # hold on to this to print sleep statements as necessary self._logger = logger # gobble this up in case there's anything useful here self._extra_kwargs = extra_kwargs @property def timestamp(self): return self._timestamp @timestamp.setter def timestamp(self, new_timestamp): if self._enforce_integer_strides: new_timestamp = floor_div(new_timestamp, self.stride) self._timestamp = new_timestamp + self.offset @property def stride(self): return self._stride @property def delay(self): return self._delay @property def offset(self): return self._offset @property def max_timestamp(self): return self._max_timestamp @max_timestamp.setter def max_timestamp(self, timestamp): self._max_timestamp = timestamp self._end = timestamp + self._delay + self._offset @property def segs(self): return segmentlist([self.seg]) @property def seg(self): return segment(self.timestamp, self.timestamp + self.stride) @property def past_seg(self): return segment(self._timestamp - self._stride, self._timestamp) @property def timeout(self): return self._timestamp + self._wait_duration - min(self._end, gpstime.gpsnow())
[docs] def wait(self): """ wait until the current time is after timestamp+delay then update timestamp (increment by one stride) """ wait = self.timeout if wait > 0: if self._logger is not None: self._logger.info(self._sleep_template % wait) TIME.sleep(wait) # increment timestamp since we've waited self._timestamp += self._stride return self.past_seg
[docs] def poll(self): """ generate segments corresponding to strides up until the current time This is essentially a multi-stide version of wait """ i = 0 while self.timeout < 0: if gps2latency(self._timestamp) > self.max_latency: # we're too far behind real time! gps = gpstime.gpsnow() raise exceptions.LatencyError( gps, gps2latency(self._timestamp), floor_div(gps, self._stride) ) if i < self.max_iters: self._timestamp += self._stride i += 1 # we've already bumped timestamp yield self.past_seg else: # too many iterations return # wait automatically bumps the timestamp self.wait() yield self.past_seg
[docs]def generate_unique_id(n=8): """generate a unique ID of length N""" alphanum = string.ascii_uppercase + string.digits return "".join(random.SystemRandom().choice(alphanum) for _ in range(n))
[docs]def draw_random_times(segments, rate=0.1): """ draw random times poisson distributed within segments with corresponding rate (in Hz) """ times = [] for seg in segments: times += draw_random_times_in_segment(*seg, rate=rate) return np.array(times, dtype=float).reshape((len(times),))
[docs]def draw_random_times_in_segment(start, end, rate=0.1): """ draw random times poisson distributed within [start, end] with corresponding rate (in Hz) """ times = [] while start < end: for dt in draw_poisson_dt(rate, size=int(1 + int(end - start) * rate)): start += dt if start > end: # we're out of this segment break times.append(start) return times
def draw_poisson_dt(rate, size=1): return -np.log(1.0 - np.random.rand(size)) / rate
[docs]def floor_div(x, n): """ Floor a number by removing its remainder from division by another number n. >>> floor_div(163, 10) 160 >>> floor_div(158, 10) 150 """ assert n > 0 if isinstance(x, int): return (x // n) * n else: return (float(x) // n) * n
[docs]def time2window(time, duration): """ Return the start and end time corresponding to the span rounded down to the nearest window time. >>> time2window(12335, 100) (12300, 12400) >>> time2window(12300, 10) (12300, 12310) """ return floor_div(time, duration), floor_div(time, duration) + duration
[docs]def segs2times(segs, dt): """ Given a segment list, returns a list of inclusive ranges of times, spaced by an interval dt, which spans these segments. >>> seg1 = segment(12300, 12302) >>> seg2 = segment(12345, 12349) >>> segs = segmentlist([seg1, seg2]) >>> segs2times(segs, 1) [array([12300., 12301.]), array([12345., 12346., 12347., 12348.])] """ times = [] for s, e in segs: times.append(np.arange(float(s), float(e), dt)) return times
[docs]def gps2latency(gps_time): """ Given a gps time, measures the latency to ms precision relative to now. """ return np.round(gpstime.gpsnow() - gps_time, 3)
[docs]def segdb2segs( start, end, union=None, intersect=None, exclude=None, segdb_url=DEFAULT_SEGDB_URL ): """ query SegDb through the Python API directly adapted from a function originally written by Chris Pankow """ segs = segmentlist([segment(start, end)]) # union common segments we want to keep if isinstance(union, str): union = [union] if union: union_segs = segmentlist() for flag in union: union_segs |= segdb2active(start, end, flag, segdb_url) segs &= union_segs # intersect between segments we want to keep if isinstance(intersect, str): intersect = [intersect] if intersect: for flag in intersect: segs &= segdb2active(start, end, flag, segdb_url) # remove the segments we want removed if isinstance(exclude, str): exclude = [exclude] if exclude: for flag in exclude: segs -= segdb2active(start, end, flag, segdb_url) return segs
[docs]def segdb2active(start, end, flag, segdb_url=DEFAULT_SEGDB_URL): """ return the active segments for a flag """ return query_segments(flag, start, end, host=segdb_url, coalesce=True)["active"]
def start_dur2start_end(x, sep="_", type_=int): start, dur = map(type_, x.split(sep)) return start, start + dur def float2sec_ns(x): sec = np.array(x).astype(int) ns = np.rint((x - sec) * 1e9) return sec, ns def sec_ns2float(sec, ns): return sec + 1.0e-9 * ns def times2segments(times, win): return segmentlist([segment(time - win, time + win) for time in times]).coalesce()
[docs]def time_in_segment(t, seg): """ return True iff time is in segment >>> seg = segment(12300, 12302) >>> time_in_segment(12346, seg) False """ # we use this so it is numpy.ndarray friendly (see times_in_segments) return bool((seg[0] <= t) * (t < seg[1]))
[docs]def times_in_segment(times, seg): """ like time_in_segment, but deals with an array instead of just a scalar """ return np.logical_and(seg[0] <= times, times < seg[1])
[docs]def time_in_segments(t, segments): """ return True iff time is in segments >>> seg1 = segment(12300, 12302) >>> seg2 = segment(12345, 12349) >>> segs = segmentlist([seg1, seg2]) >>> time_in_segments(12346, segs) True """ return np.any([time_in_segment(t, seg) for seg in segments])
[docs]def times_in_segments(times, segments): """ returns a boolean array with the same shape as timestamp representing whether they are within segments """ # return np.array([segmentlist([segment(t,t)]).intersects(segments) for t in times]) truth = np.zeros_like(times, dtype=bool) for seg in segments: truth = np.logical_or(times_in_segment(times, seg), truth) return truth
[docs]def livetime(segments): """ return the livetime spanning the segment list NOTE: assumes segments are non-overlapping >>> seg1 = segment(12300, 12302) >>> seg2 = segment(12345, 12349) >>> segs = segmentlist([seg1, seg2]) >>> livetime(segs) 6 """ return np.sum([seg[1] - seg[0] for seg in segments])
[docs]def expand_segments(segments, span): """ expand segments to a minimum span """ expanded_segs = segmentlist([]) for seg in segments: segspan = seg[1] - seg[0] if segspan < span: dspan = span - segspan expanded_segs.append(seg.protract(dspan / 2)) else: expanded_segs.append(seg) expanded_segs.coalesce() return expanded_segs
[docs]def segments_causal_kfold(segments, num_folds, initial_segs=None): """ Split segments into K equal folds, returning a list over all possible K splits. This is different from a regular K-fold split in that successive splits are supersets from the previous iteration. Return segments in the form [(k-1) folds, k th fold]. Also can take in initial_segs, otherwise the first fold will have an empty training fold. """ kfold_segments = split_segments_by_livetime(segments, num_folds) folds = [] for k in range(num_folds): current_folds = segmentlist(itertools.chain(*kfold_segments[:k])) if initial_segs is not None: current_folds.extend(initial_segs) current_folds.coalesce() new_fold = kfold_segments[k] folds.append((current_folds, new_fold)) return folds
[docs]def segments_kfold(segments, num_folds): """ Split segments into K equal folds, returning a list over all possible K splits. Return segments in the form [(k-1) folds, 1 fold]. """ return segments_checkerboard(segments, num_folds, 1)
[docs]def segments_checkerboard(segments, num_bins, num_segs_per_bin): """ split segments into num_bins*num_segs_per_bin equal folds and associate segments within bins in a checkerboard pattern. return a list over segmentlists, one for each bin """ kfold_segments = split_segments_by_livetime(segments, num_bins * num_segs_per_bin) folds = [segmentlist([]) for _ in range(num_bins)] for i, segs in enumerate(kfold_segments): folds[i % num_bins] += segs return [((segments - segs), segs) for segs in folds]
[docs]def split_segments_by_livetime(segments, num_folds): """ Split segments into K equal folds by livetime, returning a list over all folds. """ total_livetime = livetime(segments) kfold_segments = [segmentlist([]) for _ in range(num_folds)] # calculate livetime for each fold, ensuring # start, end edges fall on integer boundaries small_fold, remainder = divmod(float(total_livetime), num_folds) big_fold = small_fold + remainder kfold_livetime = [big_fold if n == 0 else small_fold for n in range(num_folds)] # determine folds fold = 0 for seg in segments: # add entire segment to current fold if livetime doesn't spill over current_livetime = livetime(kfold_segments[fold]) if current_livetime + livetime(segmentlist([seg])) <= kfold_livetime[fold]: kfold_segments[fold] |= segmentlist([seg]) # else, split segment and put spill-over into next fold(s) else: diff_livetime = kfold_livetime[fold] - current_livetime needed_seg = segmentlist([segment(seg[0], seg[0] + diff_livetime)]) kfold_segments[fold] |= needed_seg # if segment is still too big, keep splitting until it isn't remainder = segmentlist([segment(seg[0] + diff_livetime, seg[1])]) while livetime(remainder) > kfold_livetime[fold]: remainder_start = remainder[0][0] remainder_mid = remainder[0][0] + kfold_livetime[fold] kfold_segments[fold + 1] |= segmentlist( [segment(remainder_start, remainder_mid)] ) remainder = segmentlist([segment(remainder_mid, seg[1])]) fold += 1 # divvy up final piece if fold < num_folds - 1: fold += 1 kfold_segments[fold] |= remainder return kfold_segments
[docs]def split_segments_by_stride(start, end, segments, stride): """ Split segments into multiple segments with a maximum extent defined by stride. Returns a list of segment lists. """ extent = segment(start, end) splits = [ dsets[0] for dsets in segs2datasets(segments, stride=stride, keytype="segment") ] return [(split_segs & extent) for split_segs in splits]
[docs]def chunker(sequence, chunk_size): """ split up a sequence into sub-sequences of a maximum chunk size """ for i in range(0, len(sequence), chunk_size): yield sequence[i : i + chunk_size]
[docs]def path2cache(rootdir, pathname): """ given a rootdir and a glob-compatible pathname with possible shell-style wildcards, will find all files that match and populate a Cache. NOTE: this will only work with files that comply with the T050017 file convention. """ return [ CacheEntry.from_T050017(file_) for file_ in glob.iglob(os.path.join(rootdir, pathname)) ]
[docs]def start_end2gps_dirs(start, end, lookback=0): """ given start, end times, return the relevant gps directories to look at (leading 5 digits containing 100000 seconds) if lookback is given, also lookback N directories in time. """ start = (start // 100000) * 100000 # round down to nearest 100k seconds if lookback: start -= lookback * 100000 gps_dirs = [] while start < end: gps_dirs.append(str(start)[:5]) start += 100000 return gps_dirs
SEGS2DATASETS_KEY_TEMPLATE = "%d_%d"
[docs]def segs2datasets(segs, stride=32, keytype="string"): """ given a segment list, will give names for all relevant datasets that could be within an hdf5 file alongside the relevant segments within each dataset in a pair. the stride is given for a span of a single dataset. """ pairs = defaultdict(segmentlist) for seg in segs: gps_start = floor_div(seg[0], stride) while gps_start < seg[1]: if keytype == "string": key = SEGS2DATASETS_KEY_TEMPLATE % (gps_start, stride) elif keytype == "segment": key = segment(gps_start, gps_start + stride) else: raise ValueError("keytype %s not one of (string/seglist)" % keytype) pairs[key].append(seg) gps_start += stride # coalesce all segment lists before returning pairs = {dset: seglist.coalesce() for dset, seglist in pairs.items()} return pairs.items()
[docs]def elapsed_time(func): """ measures the elapsed time that a function call makes usage: add @elapsed_time decorator to function/method you want to time """ @functools.wraps(func) def wrapper(*args, **kwargs): start_time = timeit.default_timer() result = func(*args, **kwargs) elapsed = timeit.default_timer() - start_time print("%r took %.3f sec" % (func.__name__, elapsed)) return result return wrapper