from collections import defaultdict
import functools
import glob
import itertools
import os
import random
import string
import time as TIME
import timeit
import numpy as np
import gpstime
from lal.utils import CacheEntry
from ligo.segments import segment, segmentlist
from dqsegdb2.query import query_segments
from . import exceptions
# cadence management
DEFAULT_STRIDE = 1.0
DEFAULT_DELAY = 0.0
# the maximum number of iterations in CadenceManager.yield before moving on
DEFAULT_MAX_ITERS = np.infty
# the maximum latency we allow before skipping ahead
DEFAULT_MAX_LATENCY = np.infty
DEFAULT_MAX_TIMESTAMP = np.infty
DEFAULT_MONITORING_CADENCE = 1.0
# I/O ascii conventions
ASCII_COMMENTS = ["#"]
# I/O columns conventions
KW_COLUMNS = [
"start_time",
"stop_time",
"time",
"frequency",
"unnormalized_energy",
"normalized_energy",
"chisqdof",
"significance",
]
KW_TIMECOL = "time"
KW_SIGCOL = "significance"
KW_COL2IND = dict((col, i) for i, col in enumerate(KW_COLUMNS))
KW_TIMEIND = KW_COL2IND[KW_TIMECOL]
KW_TEMPLATE = "%.6f %.6f %.6f %d %.3e %.2f %d %.2f"
SNAX_COLUMNS = ["time", "frequency", "q", "snr", "phase", "duration"]
SNAX_TIMECOL = "time"
SNAX_SIGCOL = "snr"
OMICRON_COLUMNS = [
"time",
"frequency",
"tstart",
"tend",
"fstart",
"fend",
"snr",
"q",
"amplitude",
"phase",
]
OMICRON_TIMECOL = "time"
OMICRON_SIGCOL = "snr"
# segdb conventions
DEFAULT_SEGDB_URL = os.environ.get(
"DEFAULT_SEGMENT_SERVER", "https://segments.ligo.org"
)
[docs]def parse_variable_args(args, max_args, default=None):
"""
fill in unspecified variable arguments with a default value.
"""
return args + [default for _ in range(max_args - len(args))]
[docs]class CadenceManager(object):
"""
manages the cadence of our queries, including delay and timeout logic
"""
# define a few string templates that may be printed repeatedly
_sleep_template = "sleeping for %.3f sec"
def __init__(
self,
timestamp=None,
stride=DEFAULT_STRIDE,
delay=DEFAULT_DELAY,
logger=None,
enforce_integer_strides=True,
offset=0,
max_iters=DEFAULT_MAX_ITERS,
max_latency=DEFAULT_MAX_LATENCY,
max_timestamp=DEFAULT_MAX_TIMESTAMP,
**extra_kwargs
):
# start the timestamp off at an integer number of strides
self._enforce_integer_strides = enforce_integer_strides
self._stride = stride
self._offset = offset
if timestamp is None:
timestamp = gpstime.gpsnow()
# timestamp setter depends on _stride, _enforce_integer_strides, _offset
self.timestamp = timestamp
self._delay = delay
# timestamp tracks the start of the stride, so we need to wait this long
self._wait_duration = stride + delay
self.max_timestamp = max_timestamp
assert max_iters > 0, "max_iters must be non-zero"
self.max_iters = max_iters
assert max_latency > self._stride, "max_latency must be larger than stride"
self.max_latency = max_latency
# hold on to this to print sleep statements as necessary
self._logger = logger
# gobble this up in case there's anything useful here
self._extra_kwargs = extra_kwargs
@property
def timestamp(self):
return self._timestamp
@timestamp.setter
def timestamp(self, new_timestamp):
if self._enforce_integer_strides:
new_timestamp = floor_div(new_timestamp, self.stride)
self._timestamp = new_timestamp + self.offset
@property
def stride(self):
return self._stride
@property
def delay(self):
return self._delay
@property
def offset(self):
return self._offset
@property
def max_timestamp(self):
return self._max_timestamp
@max_timestamp.setter
def max_timestamp(self, timestamp):
self._max_timestamp = timestamp
self._end = timestamp + self._delay + self._offset
@property
def segs(self):
return segmentlist([self.seg])
@property
def seg(self):
return segment(self.timestamp, self.timestamp + self.stride)
@property
def past_seg(self):
return segment(self._timestamp - self._stride, self._timestamp)
@property
def timeout(self):
return self._timestamp + self._wait_duration - min(self._end, gpstime.gpsnow())
[docs] def wait(self):
"""
wait until the current time is after timestamp+delay
then update timestamp (increment by one stride)
"""
wait = self.timeout
if wait > 0:
if self._logger is not None:
self._logger.info(self._sleep_template % wait)
TIME.sleep(wait)
# increment timestamp since we've waited
self._timestamp += self._stride
return self.past_seg
[docs] def poll(self):
"""
generate segments corresponding to strides up until the current time
This is essentially a multi-stide version of wait
"""
i = 0
while self.timeout < 0:
if gps2latency(self._timestamp) > self.max_latency:
# we're too far behind real time!
gps = gpstime.gpsnow()
raise exceptions.LatencyError(
gps, gps2latency(self._timestamp), floor_div(gps, self._stride)
)
if i < self.max_iters:
self._timestamp += self._stride
i += 1
# we've already bumped timestamp
yield self.past_seg
else:
# too many iterations
return
# wait automatically bumps the timestamp
self.wait()
yield self.past_seg
[docs]def generate_unique_id(n=8):
"""generate a unique ID of length N"""
alphanum = string.ascii_uppercase + string.digits
return "".join(random.SystemRandom().choice(alphanum) for _ in range(n))
[docs]def draw_random_times(segments, rate=0.1):
"""
draw random times poisson distributed within segments
with corresponding rate (in Hz)
"""
times = []
for seg in segments:
times += draw_random_times_in_segment(*seg, rate=rate)
return np.array(times, dtype=float).reshape((len(times),))
[docs]def draw_random_times_in_segment(start, end, rate=0.1):
"""
draw random times poisson distributed within [start, end]
with corresponding rate (in Hz)
"""
times = []
while start < end:
for dt in draw_poisson_dt(rate, size=int(1 + int(end - start) * rate)):
start += dt
if start > end:
# we're out of this segment
break
times.append(start)
return times
def draw_poisson_dt(rate, size=1):
return -np.log(1.0 - np.random.rand(size)) / rate
[docs]def floor_div(x, n):
"""
Floor a number by removing its remainder
from division by another number n.
>>> floor_div(163, 10)
160
>>> floor_div(158, 10)
150
"""
assert n > 0
if isinstance(x, int):
return (x // n) * n
else:
return (float(x) // n) * n
[docs]def time2window(time, duration):
"""
Return the start and end time corresponding to the span rounded down to
the nearest window time.
>>> time2window(12335, 100)
(12300, 12400)
>>> time2window(12300, 10)
(12300, 12310)
"""
return floor_div(time, duration), floor_div(time, duration) + duration
[docs]def segs2times(segs, dt):
"""
Given a segment list, returns a list of inclusive ranges of times,
spaced by an interval dt, which spans these segments.
>>> seg1 = segment(12300, 12302)
>>> seg2 = segment(12345, 12349)
>>> segs = segmentlist([seg1, seg2])
>>> segs2times(segs, 1)
[array([12300., 12301.]), array([12345., 12346., 12347., 12348.])]
"""
times = []
for s, e in segs:
times.append(np.arange(float(s), float(e), dt))
return times
[docs]def gps2latency(gps_time):
"""
Given a gps time, measures the latency to ms precision relative to now.
"""
return np.round(gpstime.gpsnow() - gps_time, 3)
[docs]def segdb2segs(
start, end, union=None, intersect=None, exclude=None, segdb_url=DEFAULT_SEGDB_URL
):
"""
query SegDb through the Python API directly
adapted from a function originally written by Chris Pankow
"""
segs = segmentlist([segment(start, end)])
# union common segments we want to keep
if isinstance(union, str):
union = [union]
if union:
union_segs = segmentlist()
for flag in union:
union_segs |= segdb2active(start, end, flag, segdb_url)
segs &= union_segs
# intersect between segments we want to keep
if isinstance(intersect, str):
intersect = [intersect]
if intersect:
for flag in intersect:
segs &= segdb2active(start, end, flag, segdb_url)
# remove the segments we want removed
if isinstance(exclude, str):
exclude = [exclude]
if exclude:
for flag in exclude:
segs -= segdb2active(start, end, flag, segdb_url)
return segs
[docs]def segdb2active(start, end, flag, segdb_url=DEFAULT_SEGDB_URL):
"""
return the active segments for a flag
"""
return query_segments(flag, start, end, host=segdb_url, coalesce=True)["active"]
def start_dur2start_end(x, sep="_", type_=int):
start, dur = map(type_, x.split(sep))
return start, start + dur
def float2sec_ns(x):
sec = np.array(x).astype(int)
ns = np.rint((x - sec) * 1e9)
return sec, ns
def sec_ns2float(sec, ns):
return sec + 1.0e-9 * ns
def times2segments(times, win):
return segmentlist([segment(time - win, time + win) for time in times]).coalesce()
[docs]def time_in_segment(t, seg):
"""
return True iff time is in segment
>>> seg = segment(12300, 12302)
>>> time_in_segment(12346, seg)
False
"""
# we use this so it is numpy.ndarray friendly (see times_in_segments)
return bool((seg[0] <= t) * (t < seg[1]))
[docs]def times_in_segment(times, seg):
"""
like time_in_segment, but deals with an array instead of just a scalar
"""
return np.logical_and(seg[0] <= times, times < seg[1])
[docs]def time_in_segments(t, segments):
"""
return True iff time is in segments
>>> seg1 = segment(12300, 12302)
>>> seg2 = segment(12345, 12349)
>>> segs = segmentlist([seg1, seg2])
>>> time_in_segments(12346, segs)
True
"""
return np.any([time_in_segment(t, seg) for seg in segments])
[docs]def times_in_segments(times, segments):
"""
returns a boolean array with the same shape as timestamp
representing whether they are within segments
"""
# return np.array([segmentlist([segment(t,t)]).intersects(segments) for t in times])
truth = np.zeros_like(times, dtype=bool)
for seg in segments:
truth = np.logical_or(times_in_segment(times, seg), truth)
return truth
[docs]def livetime(segments):
"""
return the livetime spanning the segment list
NOTE: assumes segments are non-overlapping
>>> seg1 = segment(12300, 12302)
>>> seg2 = segment(12345, 12349)
>>> segs = segmentlist([seg1, seg2])
>>> livetime(segs)
6
"""
return np.sum([seg[1] - seg[0] for seg in segments])
[docs]def expand_segments(segments, span):
"""
expand segments to a minimum span
"""
expanded_segs = segmentlist([])
for seg in segments:
segspan = seg[1] - seg[0]
if segspan < span:
dspan = span - segspan
expanded_segs.append(seg.protract(dspan / 2))
else:
expanded_segs.append(seg)
expanded_segs.coalesce()
return expanded_segs
[docs]def segments_causal_kfold(segments, num_folds, initial_segs=None):
"""
Split segments into K equal folds, returning a
list over all possible K splits.
This is different from a regular K-fold split in that
successive splits are supersets from the previous iteration.
Return segments in the form [(k-1) folds, k th fold].
Also can take in initial_segs, otherwise the first fold will have an empty
training fold.
"""
kfold_segments = split_segments_by_livetime(segments, num_folds)
folds = []
for k in range(num_folds):
current_folds = segmentlist(itertools.chain(*kfold_segments[:k]))
if initial_segs is not None:
current_folds.extend(initial_segs)
current_folds.coalesce()
new_fold = kfold_segments[k]
folds.append((current_folds, new_fold))
return folds
[docs]def segments_kfold(segments, num_folds):
"""
Split segments into K equal folds, returning a
list over all possible K splits.
Return segments in the form [(k-1) folds, 1 fold].
"""
return segments_checkerboard(segments, num_folds, 1)
[docs]def segments_checkerboard(segments, num_bins, num_segs_per_bin):
"""
split segments into num_bins*num_segs_per_bin equal folds and
associate segments within bins in a checkerboard pattern.
return a list over segmentlists, one for each bin
"""
kfold_segments = split_segments_by_livetime(segments, num_bins * num_segs_per_bin)
folds = [segmentlist([]) for _ in range(num_bins)]
for i, segs in enumerate(kfold_segments):
folds[i % num_bins] += segs
return [((segments - segs), segs) for segs in folds]
[docs]def split_segments_by_livetime(segments, num_folds):
"""
Split segments into K equal folds by livetime,
returning a list over all folds.
"""
total_livetime = livetime(segments)
kfold_segments = [segmentlist([]) for _ in range(num_folds)]
# calculate livetime for each fold, ensuring
# start, end edges fall on integer boundaries
small_fold, remainder = divmod(float(total_livetime), num_folds)
big_fold = small_fold + remainder
kfold_livetime = [big_fold if n == 0 else small_fold for n in range(num_folds)]
# determine folds
fold = 0
for seg in segments:
# add entire segment to current fold if livetime doesn't spill over
current_livetime = livetime(kfold_segments[fold])
if current_livetime + livetime(segmentlist([seg])) <= kfold_livetime[fold]:
kfold_segments[fold] |= segmentlist([seg])
# else, split segment and put spill-over into next fold(s)
else:
diff_livetime = kfold_livetime[fold] - current_livetime
needed_seg = segmentlist([segment(seg[0], seg[0] + diff_livetime)])
kfold_segments[fold] |= needed_seg
# if segment is still too big, keep splitting until it isn't
remainder = segmentlist([segment(seg[0] + diff_livetime, seg[1])])
while livetime(remainder) > kfold_livetime[fold]:
remainder_start = remainder[0][0]
remainder_mid = remainder[0][0] + kfold_livetime[fold]
kfold_segments[fold + 1] |= segmentlist(
[segment(remainder_start, remainder_mid)]
)
remainder = segmentlist([segment(remainder_mid, seg[1])])
fold += 1
# divvy up final piece
if fold < num_folds - 1:
fold += 1
kfold_segments[fold] |= remainder
return kfold_segments
[docs]def split_segments_by_stride(start, end, segments, stride):
"""
Split segments into multiple segments with a maximum
extent defined by stride.
Returns a list of segment lists.
"""
extent = segment(start, end)
splits = [
dsets[0] for dsets in segs2datasets(segments, stride=stride, keytype="segment")
]
return [(split_segs & extent) for split_segs in splits]
[docs]def chunker(sequence, chunk_size):
"""
split up a sequence into sub-sequences of a maximum chunk size
"""
for i in range(0, len(sequence), chunk_size):
yield sequence[i : i + chunk_size]
[docs]def path2cache(rootdir, pathname):
"""
given a rootdir and a glob-compatible pathname with possible shell-style wildcards,
will find all files that match and populate a Cache.
NOTE: this will only work with files that comply with the T050017 file convention.
"""
return [
CacheEntry.from_T050017(file_)
for file_ in glob.iglob(os.path.join(rootdir, pathname))
]
[docs]def start_end2gps_dirs(start, end, lookback=0):
"""
given start, end times, return the relevant gps directories to look at
(leading 5 digits containing 100000 seconds)
if lookback is given, also lookback N directories in time.
"""
start = (start // 100000) * 100000 # round down to nearest 100k seconds
if lookback:
start -= lookback * 100000
gps_dirs = []
while start < end:
gps_dirs.append(str(start)[:5])
start += 100000
return gps_dirs
SEGS2DATASETS_KEY_TEMPLATE = "%d_%d"
[docs]def segs2datasets(segs, stride=32, keytype="string"):
"""
given a segment list, will give names for all relevant datasets
that could be within an hdf5 file alongside the relevant segments within
each dataset in a pair.
the stride is given for a span of a single dataset.
"""
pairs = defaultdict(segmentlist)
for seg in segs:
gps_start = floor_div(seg[0], stride)
while gps_start < seg[1]:
if keytype == "string":
key = SEGS2DATASETS_KEY_TEMPLATE % (gps_start, stride)
elif keytype == "segment":
key = segment(gps_start, gps_start + stride)
else:
raise ValueError("keytype %s not one of (string/seglist)" % keytype)
pairs[key].append(seg)
gps_start += stride
# coalesce all segment lists before returning
pairs = {dset: seglist.coalesce() for dset, seglist in pairs.items()}
return pairs.items()
[docs]def elapsed_time(func):
"""
measures the elapsed time that a function call makes
usage: add @elapsed_time decorator to function/method you want to time
"""
@functools.wraps(func)
def wrapper(*args, **kwargs):
start_time = timeit.default_timer()
result = func(*args, **kwargs)
elapsed = timeit.default_timer() - start_time
print("%r took %.3f sec" % (func.__name__, elapsed))
return result
return wrapper