import os
import h5py
import numpy as np
from ligo.segments import segment, segmentlist
from ... import hookimpl
from ... import utils
from ... import features
from ... import calibration
from ...series import ChannelInfo, Series, SeriesInfo
from . import DiskReporter
[docs]class HDF5Reporter(DiskReporter):
"""
a variant of DiskReporter that expects objects to fit
into a tabular data format and writes that to disk as hdf5
"""
_suffix = "hdf5"
def _write(self, path, data, **attrs):
with h5py.File(path, "w") as file_obj:
dataset = file_obj.create_dataset("data", data=data)
for key, value in attrs.items():
if isinstance(value, str):
value = np.string_(value)
dataset.attrs.create(key, value)
@classmethod
def read(cls, path):
with h5py.File(path, "r") as file_obj:
data = file_obj["data"][...]
attrs = dict(file_obj["data"].attrs.items())
return (data, attrs)
[docs]class HDF5SegmentReporter(HDF5Reporter):
"""
a variant of HDF5 reporter that expects to only deal with segment lists
"""
_dtypes = [
("start_sec", ">i4"),
("start_ns", ">i4"),
("end_sec", ">i4"),
("end_ns", ">i4"),
]
def _write(self, path, segments, **kwargs):
h5segs = []
for s, e in segments:
start_sec, start_ns = utils.float2sec_ns(s)
end_sec, end_ns = utils.float2sec_ns(e)
h5segs.append((start_sec, start_ns, end_sec, end_ns))
start_sec, start_ns = utils.float2sec_ns(self.start)
end_sec, end_ns = utils.float2sec_ns(self.end)
return HDF5Reporter._write(
self,
path,
h5segs,
start_sec=start_sec,
start_ns=start_ns,
end_sec=end_sec,
end_ns=end_ns,
**kwargs,
)
@classmethod
def read(cls, path):
h5segs = segmentlist([])
data, _ = super().read(path)
for start_sec, start_ns, end_sec, end_ns in data:
h5segs.append(
segment(
utils.sec_ns2float(start_sec, start_ns),
utils.sec_ns2float(end_sec, end_ns),
)
)
return h5segs
[docs]class GPSTimesReporter(HDF5Reporter):
"""
a variant of HDF5 reporter that expects to only deal with lists of gps times
"""
_dtypes = [("gps_sec", ">i4"), ("gps_ns", ">i4")]
def _write(self, path, times, **kwargs):
data = np.array(list(zip(*utils.float2sec_ns(times))), dtype=self._dtypes)
start_sec, start_ns = utils.float2sec_ns(self.start)
end_sec, end_ns = utils.float2sec_ns(self.end)
return HDF5Reporter._write(
self,
path,
data,
start_sec=start_sec,
start_ns=start_ns,
end_sec=end_sec,
end_ns=end_ns,
**kwargs,
)
@classmethod
def read(cls, path):
data, attrs = super().read(path)
return (
utils.sec_ns2float(data["gps_sec"], data["gps_ns"]),
utils.sec_ns2float(attrs["start_sec"], attrs["start_ns"]),
utils.sec_ns2float(attrs["end_sec"], attrs["end_ns"]),
)
DATASET_HDF5_DTYPE = [
("gps", "float"),
("rank", "float"),
("label", "float"),
("hash", "S30"),
]
DATASET_HDF5_NAME = "dataset"
OLD_DATASET_HDF5_NAME = "quiver"
def _dataset2dataset(h5group, dataset):
"""a helper function to allow us to re-use dataset <--> hdf5 logic"""
data = []
for gps, rank, label, hash_ in zip(
dataset.times, dataset.ranks, dataset.labels, dataset.hashes
):
data.append((gps, rank, label, str(hash_)))
dataset = h5group.create_dataset(
DATASET_HDF5_NAME, data=np.array(data, dtype=DATASET_HDF5_DTYPE)
)
DATASET_HDF5_SEG_DTYPE = [
("start_sec", ">i4"),
("start_ns", ">i4"),
("end_sec", ">i4"),
("end_ns", ">i4"),
]
DATASET_HDF5_SEG_NAME = "dataset_segments"
OLD_DATASET_HDF5_SEG_NAME = "quiver_segments"
def _segs2dataset(h5group, start, end, segs, seg_name=DATASET_HDF5_SEG_NAME):
"""a helper function to allow us to re-use dataset <--> hdf5 logic"""
h5segs = []
for s, e in segs:
start_sec, start_ns = utils.float2sec_ns(s)
end_sec, end_ns = utils.float2sec_ns(e)
h5segs.append((start_sec, start_ns, end_sec, end_ns))
dataset = h5group.create_dataset(
seg_name, data=np.array(h5segs, dtype=DATASET_HDF5_SEG_DTYPE)
)
# attrs
start_sec, start_ns = utils.float2sec_ns(start)
end_sec, end_ns = utils.float2sec_ns(end)
for key, value in [
("start_sec", start_sec),
("start_ns", start_ns),
("end_sec", end_sec),
("end_ns", end_ns),
]:
dataset.attrs.create(key, value)
def _h5group2dataset(h5group):
"""a helper function to allow us to re-use dataset <--> hdf5 logic"""
# backwards compatibility for older (pre v0.5) datasets
if DATASET_HDF5_NAME in h5group:
dataset_group = DATASET_HDF5_NAME
else:
dataset_group = OLD_DATASET_HDF5_NAME
if DATASET_HDF5_SEG_NAME in h5group:
dataset_seg_group = DATASET_HDF5_SEG_NAME
else:
dataset_seg_group = OLD_DATASET_HDF5_SEG_NAME
attrs = dict(h5group[dataset_seg_group].attrs.items())
start = utils.sec_ns2float(attrs["start_sec"], attrs["start_ns"])
end = utils.sec_ns2float(attrs["end_sec"], attrs["end_ns"])
segs = segmentlist([])
for start_sec, start_ns, end_sec, end_ns in h5group[dataset_seg_group][...]:
segs.append(
segment(
utils.sec_ns2float(start_sec, start_ns),
utils.sec_ns2float(end_sec, end_ns),
)
)
times, ranks, labels, hashes = [], [], [], []
for row in h5group[dataset_group][...]: # add in the FeatureVector data
times.append(row["gps"])
labels.append(row["label"])
ranks.append(None if row["rank"] != row["rank"] else row["rank"])
hashes.append(None if row["hash"] == "None" else row["hash"])
return features.Dataset(
times,
ranks=ranks,
labels=labels,
hashes=hashes,
start=start,
end=end,
segs=segs,
)
[docs]class DatasetReporter(HDF5Reporter):
"""
store high-level attributes from a dataset
"""
def _nickname2segsname(self, nickname):
return nickname + "_segments"
def _write(self, path, dataset, **kwargs):
with h5py.File(path, "w") as file_obj:
_dataset2dataset(file_obj, dataset)
if dataset.segs is not None:
_segs2dataset(file_obj, dataset.start, dataset.end, dataset.segs)
@classmethod
def read(cls, path):
with h5py.File(path, "r") as file_obj:
dataset = _h5group2dataset(file_obj)
return dataset
[docs]class CalibrationMapReporter(HDF5Reporter):
"""
store high-level information from a calibration map
based on HDF5Reporter but extends this to use a more
complicated structure under the hood
"""
_samples_dtype = [("sample", "float")]
_interp_dtype = [
("x", "float"),
("logpdf", "float"),
("logpdf_m1", "float"),
("logpdf_m2", "float"),
("cdf", "float"),
("cdf_m1", "float"),
("cdf_m2", "float"),
("pdf_alpha", "float"),
("pdf_beta", "float"),
("cdf_alpha", "float"),
("cdf_beta", "float"),
]
_DiscreteCalibrationMap_group = "DiscreteCalibrationMap"
_CalibrationMap_group = "CalibrationMap"
def _write(self, path, calibrationmap, **kwargs):
if isinstance(calibrationmap, calibration.DiscreteCalibrationMap):
return self._write_DiscreteCalibrationMap(path, calibrationmap, **kwargs)
elif isinstance(calibrationmap, calibration.CalibrationMap):
return self._write_CalibrationMap(path, calibrationmap, **kwargs)
else:
raise ValueError(
"calibrationmap must be an instance of either "
"calibration.DiscreteCalibrationMap or calibration.CalibrationMap"
)
def _write_DiscreteCalibrationMap(self, path, calibrationmap, **kwargs):
with h5py.File(path, "w") as file_obj:
group = file_obj.create_group(self._DiscreteCalibrationMap_group)
group.attrs.create("hash", np.string_(calibrationmap.hash))
group.attrs.create("model_id", np.string_(calibrationmap.model_id))
group.attrs.create(
"rate_estimation", np.string_(calibrationmap.rate_estimation)
)
ranks = calibrationmap.ranks
dataset = group.create_dataset(
"loglike_cdf",
data=np.array([calibrationmap._loglike_cdf[rank] for rank in ranks]),
)
dataset.attrs.create("ranks", ranks)
dataset.attrs.create(
"num_quantiles", int(round(1.0 / calibrationmap._dq, 0))
)
# store clean segments if applicable
if calibrationmap._clean_segs is not None:
_segs2dataset(
group,
calibrationmap.start,
calibrationmap.end,
calibrationmap._clean_segs,
seg_name="clean_segments",
)
for name, dataset, kde in [
("gch", calibrationmap._gch_dataset, calibrationmap._gch_kde),
("cln", calibrationmap._cln_dataset, calibrationmap._cln_kde),
]:
g = group.create_group(name)
# store the dataset
_dataset2dataset(g, dataset)
if dataset.segs is not None:
_segs2dataset(g, dataset.start, dataset.end, dataset.segs)
# store the discrete KDEs
dataset = g.create_dataset("observations", data=kde.obs)
def _write_CalibrationMap(self, path, calibrationmap, **kwargs):
with h5py.File(path, "w") as file_obj:
group = file_obj.create_group(self._CalibrationMap_group)
group.attrs.create("hash", np.string_(calibrationmap.hash))
group.attrs.create("model_id", np.string_(calibrationmap.model_id))
group.attrs.create(
"rate_estimation", np.string_(calibrationmap.rate_estimation)
)
# store loglike_cdf interpolation object
dataset = group.create_dataset(
"interp_loglike_cdf", data=calibrationmap._interp_loglike_cdf
)
dataset.attrs.create("num_points", len(calibrationmap._ranks))
dataset.attrs.create(
"num_quantiles", int(round(1.0 / calibrationmap._dq, 0))
)
# store clean segments if applicable
if calibrationmap._clean_segs is not None:
_segs2dataset(
group,
calibrationmap.start,
calibrationmap.end,
calibrationmap._clean_segs,
seg_name="clean_segments",
)
# store individual interpolation objects
for name, dataset, kde in [
("gch", calibrationmap._gch_dataset, calibrationmap._gch_kde),
("cln", calibrationmap._cln_dataset, calibrationmap._cln_kde),
]:
g = group.create_group(name)
# store the datasets
_dataset2dataset(g, dataset)
if dataset.segs is not None:
_segs2dataset(g, dataset.start, dataset.end, dataset.segs)
# store the processed KDEs
data = np.array(
list(
zip(
kde._interp_x,
kde._interp_logpdf,
kde._interp_logpdf_m1[:, 0],
kde._interp_logpdf_m2[:, 0],
kde._interp_cdf,
kde._interp_cdf_m1[:, 0],
kde._interp_cdf_m2[:, 0],
kde._interp_pdf_alpha,
kde._interp_pdf_beta,
kde._interp_cdf_alpha,
kde._interp_cdf_beta,
)
),
dtype=self._interp_dtype,
)
dataset = g.create_dataset("interp", data=data)
dataset.attrs.create("b", kde.b)
dataset.attrs.create("num_points", kde._num_points)
dataset.attrs.create("count", kde._count)
@classmethod
def read(cls, path):
with h5py.File(path, "r") as file_obj:
if cls._DiscreteCalibrationMap_group in file_obj:
return cls._read_DiscreteCalibrationMap(
file_obj[cls._DiscreteCalibrationMap_group]
)
elif cls._CalibrationMap_group in file_obj:
return cls._read_CalibrationMap(file_obj[cls._CalibrationMap_group])
else:
raise ValueError(
"could not find %s or %s in %s"
% (
cls._DiscreteCalibrationMap_group,
cls._CalibrationMap_group,
path,
)
)
def _read_DiscreteCalibrationMap(self, group):
loglike = group["loglike_cdf"]
# pull out datasets
gch = _h5group2dataset(group["gch"])
cln = _h5group2dataset(group["cln"])
# read in clean segments
if "clean_segments" in group:
clean_segs = segmentlist([])
for start_sec, start_ns, end_sec, end_ns in group["clean_segments"][...]:
clean_segs.append(
segment(
utils.sec_ns2float(start_sec, start_ns),
utils.sec_ns2float(end_sec, end_ns),
)
)
else:
clean_segs = None
# extract model ID
# NOTE: older calibration maps don't have the model_id attribute,
# which is needed to determine which model was used in samples
# in this calibration map. this still allows data discoverability
try:
model_id = group.attrs["model_id"].decode("utf-8")
except KeyError:
model_id = None
# set up the calibration map's provenance
dummy_dataset = features.Dataset()
dummy_dataset.start = min(gch.start, cln.start)
dummy_dataset.end = max(gch.end, cln.end)
dummy_dataset.segs = gch.segs | cln.segs
calibrationmap = calibration.DiscreteCalibrationMap(
dummy_dataset,
num_quantiles=loglike.attrs["num_quantiles"],
compute=False,
model_id=model_id,
clean_segs=clean_segs,
rate_estimation=group.attrs["rate_estimation"].decode("utf-8"),
)
calibrationmap.hash = group.attrs["hash"].decode("utf-8")
# fill in datasets
calibrationmap._gch_dataset = gch
calibrationmap._cln_dataset = cln
# fill in observations for each kde, which will
# re-compute everything needed therein (pretty cheap)
calibrationmap._gch_kde.add_observations(group["gch/observations"][...])
calibrationmap._cln_kde.add_observations(group["cln/observations"][...])
# fill in interpolation objects for loglike_quantiles
# just copy the answer straight out of the hdf5 file
calibrationmap._loglike_cdf = dict(
(rank, thing) for rank, thing in zip(loglike.attrs["ranks"], loglike[...])
)
# compute the rates used for the prior odds and such
calibrationmap.compute_rates()
return calibrationmap
@classmethod
def _read_CalibrationMap(cls, group):
interp_loglike = group["interp_loglike_cdf"]
# pull out datasets
gch = _h5group2dataset(group["gch"])
cln = _h5group2dataset(group["cln"])
# pull out interpolated things
gch_interp = group["gch/interp"]
cln_interp = group["cln/interp"]
# read in clean segments
if "clean_segments" in group:
clean_segs = segmentlist([])
for start_sec, start_ns, end_sec, end_ns in group["clean_segments"][...]:
clean_segs.append(
segment(
utils.sec_ns2float(start_sec, start_ns),
utils.sec_ns2float(end_sec, end_ns),
)
)
else:
clean_segs = None
# extract model ID
# NOTE: older calibration maps don't have the model_id attribute,
# which is needed to determine which model was used in samples
# in this calibration map. this still allows data discoverability
try:
model_id = group.attrs["model_id"].decode("utf-8")
except KeyError:
model_id = None
# set up the calibration map's provenance
start = min(gch.start, cln.start)
end = max(gch.end, cln.end)
segs = gch.segs | cln.segs
dummy_dataset = features.Dataset([], start=start, end=end, segs=segs)
calibrationmap = calibration.CalibrationMap(
dummy_dataset,
num_quantiles=interp_loglike.attrs["num_quantiles"],
num_points=interp_loglike.attrs["num_points"],
gch_num_points=gch_interp.attrs["num_points"],
gch_b=gch_interp.attrs["b"],
cln_num_points=cln_interp.attrs["num_points"],
cln_b=cln_interp.attrs["b"],
compute=False,
clean_segs=clean_segs,
model_id=model_id,
rate_estimation=group.attrs["rate_estimation"].decode("utf-8"),
)
calibrationmap.hash = group.attrs["hash"].decode("utf-8")
# fill in datasets
calibrationmap._gch_dataset = gch
calibrationmap._cln_dataset = cln
# fill in interpolation objects for loglike_quantiles
calibrationmap._interp_loglike_cdf = interp_loglike[...]
# fill in recorded interpolation objects
for kde, dataset, obj in [
(calibrationmap._gch_kde, gch, gch_interp),
(calibrationmap._cln_kde, cln, cln_interp),
]:
# high-level stuff
kde._obs = dataset.ranks
kde._count = obj.attrs["count"]
# interpolation stuff
kde._interp_x[:] = obj["x"]
kde._interp_logpdf[:] = obj["logpdf"]
kde._interp_logpdf_m1[:, 0] = obj["logpdf_m1"]
kde._interp_logpdf_m2[:, 0] = obj["logpdf_m2"]
kde._interp_cdf = obj["cdf"]
kde._interp_cdf_m1[:, 0] = obj["cdf_m1"]
kde._interp_cdf_m2[:, 0] = obj["cdf_m2"]
kde._interp_pdf_alpha[:] = obj["pdf_alpha"]
kde._interp_pdf_beta[:] = obj["pdf_beta"]
kde._interp_cdf_alpha[:] = obj["cdf_alpha"]
kde._interp_cdf_beta[:] = obj["cdf_beta"]
# compute the rates used for the prior odds and such
calibrationmap.compute_rates()
return calibrationmap
HDF5SERIES_REPORTER_DATASET_TEMPLATE = "%d_%d"
[docs]class HDF5SeriesReporter(HDF5Reporter):
"""
store series information in hdf5 files
"""
_T0_NAME = "t0"
_DELTAT_NAME = "deltaT"
_MODEL_NAME = "idq_model"
_CALIB_NAME = "idq_calib"
def _write(self, path, all_series, run=0, **kwargs):
if isinstance(all_series, SeriesInfo):
all_series = [all_series]
with h5py.File(path, "w") as file_obj:
group = file_obj.create_group("series")
for series in all_series:
dataset_name = HDF5SERIES_REPORTER_DATASET_TEMPLATE % (
int(series.t0),
int(series.dt),
)
data = np.array(
list(zip(*(series.values()))),
dtype=[(key, "float") for key in series.keys()],
)
dataset = group.create_dataset(dataset_name, data=data)
dataset.attrs.create(self._T0_NAME, series.t0)
dataset.attrs.create(self._DELTAT_NAME, series.dt)
# record the associated model hash
dataset.attrs.create(self._MODEL_NAME, np.string_(series.model_id))
# and the calibration map hash
dataset.attrs.create(
self._CALIB_NAME, np.string_(series.calibration_id)
)
dataset.attrs.create("run", run)
# NOTE: could be fragile
dataset.attrs.create(
"name", np.string_("-".join(os.path.basename(path).split("-")[:-2]))
)
# store channel info
dataset.attrs.create("detector", series.info.detector)
dataset.attrs.create("nickname", series.info.nickname)
dataset.attrs.create("f_low", series.info.f_low)
dataset.attrs.create("f_high", series.info.f_high)
for key in series.keys():
dataset.attrs.create(key + "_unit", np.string_(""))
@classmethod
def read(cls, path):
all_series = []
with h5py.File(path, "r") as file_obj:
group = file_obj["series"]
for key in group.keys():
dataset = group[key]
info = ChannelInfo(
detector=dataset.attrs["detector"],
nickname=dataset.attrs["nickname"],
f_low=dataset.attrs["f_low"],
f_high=dataset.attrs["f_high"],
)
sdict = {}
for field in dataset[...].dtype.fields.keys():
metric = info.channel_to_metric_name(field)
sdict[metric] = Series(field, dataset[field])
series = SeriesInfo(
t0=dataset.attrs[cls._T0_NAME],
dt=dataset.attrs[cls._DELTAT_NAME],
info=info,
model_id=str(np.char.decode(dataset.attrs[cls._MODEL_NAME])),
calibration_id=str(np.char.decode(dataset.attrs[cls._CALIB_NAME])),
**sdict,
)
all_series.append(series)
return all_series
@hookimpl
def get_reporters():
return {
"hdf5": HDF5Reporter,
"span": GPSTimesReporter,
"span:hdf5": GPSTimesReporter,
"series": HDF5SeriesReporter,
"series:hdf5": HDF5SeriesReporter,
"segment": HDF5SegmentReporter,
"segment:hdf5": HDF5SegmentReporter,
"dataset": DatasetReporter,
"dataset:hdf5": DatasetReporter,
"calib": CalibrationMapReporter,
"calib:hdf5": CalibrationMapReporter,
}