Source code for idq.io.reporters.hdf5

import os

import h5py
import numpy as np

from ligo.segments import segment, segmentlist

from ... import hookimpl
from ... import utils
from ... import features
from ... import calibration
from ...series import ChannelInfo, Series, SeriesInfo
from . import DiskReporter


[docs]class HDF5Reporter(DiskReporter): """ a variant of DiskReporter that expects objects to fit into a tabular data format and writes that to disk as hdf5 """ _suffix = "hdf5" def _write(self, path, data, **attrs): with h5py.File(path, "w") as file_obj: dataset = file_obj.create_dataset("data", data=data) for key, value in attrs.items(): if isinstance(value, str): value = np.string_(value) dataset.attrs.create(key, value) @classmethod def read(cls, path): with h5py.File(path, "r") as file_obj: data = file_obj["data"][...] attrs = dict(file_obj["data"].attrs.items()) return (data, attrs)
[docs]class HDF5SegmentReporter(HDF5Reporter): """ a variant of HDF5 reporter that expects to only deal with segment lists """ _dtypes = [ ("start_sec", ">i4"), ("start_ns", ">i4"), ("end_sec", ">i4"), ("end_ns", ">i4"), ] def _write(self, path, segments, **kwargs): h5segs = [] for s, e in segments: start_sec, start_ns = utils.float2sec_ns(s) end_sec, end_ns = utils.float2sec_ns(e) h5segs.append((start_sec, start_ns, end_sec, end_ns)) start_sec, start_ns = utils.float2sec_ns(self.start) end_sec, end_ns = utils.float2sec_ns(self.end) return HDF5Reporter._write( self, path, h5segs, start_sec=start_sec, start_ns=start_ns, end_sec=end_sec, end_ns=end_ns, **kwargs, ) @classmethod def read(cls, path): h5segs = segmentlist([]) data, _ = super().read(path) for start_sec, start_ns, end_sec, end_ns in data: h5segs.append( segment( utils.sec_ns2float(start_sec, start_ns), utils.sec_ns2float(end_sec, end_ns), ) ) return h5segs
[docs]class GPSTimesReporter(HDF5Reporter): """ a variant of HDF5 reporter that expects to only deal with lists of gps times """ _dtypes = [("gps_sec", ">i4"), ("gps_ns", ">i4")] def _write(self, path, times, **kwargs): data = np.array(list(zip(*utils.float2sec_ns(times))), dtype=self._dtypes) start_sec, start_ns = utils.float2sec_ns(self.start) end_sec, end_ns = utils.float2sec_ns(self.end) return HDF5Reporter._write( self, path, data, start_sec=start_sec, start_ns=start_ns, end_sec=end_sec, end_ns=end_ns, **kwargs, ) @classmethod def read(cls, path): data, attrs = super().read(path) return ( utils.sec_ns2float(data["gps_sec"], data["gps_ns"]), utils.sec_ns2float(attrs["start_sec"], attrs["start_ns"]), utils.sec_ns2float(attrs["end_sec"], attrs["end_ns"]), )
DATASET_HDF5_DTYPE = [ ("gps", "float"), ("rank", "float"), ("label", "float"), ("hash", "S30"), ] DATASET_HDF5_NAME = "dataset" OLD_DATASET_HDF5_NAME = "quiver" def _dataset2dataset(h5group, dataset): """a helper function to allow us to re-use dataset <--> hdf5 logic""" data = [] for gps, rank, label, hash_ in zip( dataset.times, dataset.ranks, dataset.labels, dataset.hashes ): data.append((gps, rank, label, str(hash_))) dataset = h5group.create_dataset( DATASET_HDF5_NAME, data=np.array(data, dtype=DATASET_HDF5_DTYPE) ) DATASET_HDF5_SEG_DTYPE = [ ("start_sec", ">i4"), ("start_ns", ">i4"), ("end_sec", ">i4"), ("end_ns", ">i4"), ] DATASET_HDF5_SEG_NAME = "dataset_segments" OLD_DATASET_HDF5_SEG_NAME = "quiver_segments" def _segs2dataset(h5group, start, end, segs, seg_name=DATASET_HDF5_SEG_NAME): """a helper function to allow us to re-use dataset <--> hdf5 logic""" h5segs = [] for s, e in segs: start_sec, start_ns = utils.float2sec_ns(s) end_sec, end_ns = utils.float2sec_ns(e) h5segs.append((start_sec, start_ns, end_sec, end_ns)) dataset = h5group.create_dataset( seg_name, data=np.array(h5segs, dtype=DATASET_HDF5_SEG_DTYPE) ) # attrs start_sec, start_ns = utils.float2sec_ns(start) end_sec, end_ns = utils.float2sec_ns(end) for key, value in [ ("start_sec", start_sec), ("start_ns", start_ns), ("end_sec", end_sec), ("end_ns", end_ns), ]: dataset.attrs.create(key, value) def _h5group2dataset(h5group): """a helper function to allow us to re-use dataset <--> hdf5 logic""" # backwards compatibility for older (pre v0.5) datasets if DATASET_HDF5_NAME in h5group: dataset_group = DATASET_HDF5_NAME else: dataset_group = OLD_DATASET_HDF5_NAME if DATASET_HDF5_SEG_NAME in h5group: dataset_seg_group = DATASET_HDF5_SEG_NAME else: dataset_seg_group = OLD_DATASET_HDF5_SEG_NAME attrs = dict(h5group[dataset_seg_group].attrs.items()) start = utils.sec_ns2float(attrs["start_sec"], attrs["start_ns"]) end = utils.sec_ns2float(attrs["end_sec"], attrs["end_ns"]) segs = segmentlist([]) for start_sec, start_ns, end_sec, end_ns in h5group[dataset_seg_group][...]: segs.append( segment( utils.sec_ns2float(start_sec, start_ns), utils.sec_ns2float(end_sec, end_ns), ) ) times, ranks, labels, hashes = [], [], [], [] for row in h5group[dataset_group][...]: # add in the FeatureVector data times.append(row["gps"]) labels.append(row["label"]) ranks.append(None if row["rank"] != row["rank"] else row["rank"]) hashes.append(None if row["hash"] == "None" else row["hash"]) return features.Dataset( times, ranks=ranks, labels=labels, hashes=hashes, start=start, end=end, segs=segs, )
[docs]class DatasetReporter(HDF5Reporter): """ store high-level attributes from a dataset """ def _nickname2segsname(self, nickname): return nickname + "_segments" def _write(self, path, dataset, **kwargs): with h5py.File(path, "w") as file_obj: _dataset2dataset(file_obj, dataset) if dataset.segs is not None: _segs2dataset(file_obj, dataset.start, dataset.end, dataset.segs) @classmethod def read(cls, path): with h5py.File(path, "r") as file_obj: dataset = _h5group2dataset(file_obj) return dataset
[docs]class CalibrationMapReporter(HDF5Reporter): """ store high-level information from a calibration map based on HDF5Reporter but extends this to use a more complicated structure under the hood """ _samples_dtype = [("sample", "float")] _interp_dtype = [ ("x", "float"), ("logpdf", "float"), ("logpdf_m1", "float"), ("logpdf_m2", "float"), ("cdf", "float"), ("cdf_m1", "float"), ("cdf_m2", "float"), ("pdf_alpha", "float"), ("pdf_beta", "float"), ("cdf_alpha", "float"), ("cdf_beta", "float"), ] _DiscreteCalibrationMap_group = "DiscreteCalibrationMap" _CalibrationMap_group = "CalibrationMap" def _write(self, path, calibrationmap, **kwargs): if isinstance(calibrationmap, calibration.DiscreteCalibrationMap): return self._write_DiscreteCalibrationMap(path, calibrationmap, **kwargs) elif isinstance(calibrationmap, calibration.CalibrationMap): return self._write_CalibrationMap(path, calibrationmap, **kwargs) else: raise ValueError( "calibrationmap must be an instance of either " "calibration.DiscreteCalibrationMap or calibration.CalibrationMap" ) def _write_DiscreteCalibrationMap(self, path, calibrationmap, **kwargs): with h5py.File(path, "w") as file_obj: group = file_obj.create_group(self._DiscreteCalibrationMap_group) group.attrs.create("hash", np.string_(calibrationmap.hash)) group.attrs.create("model_id", np.string_(calibrationmap.model_id)) group.attrs.create( "rate_estimation", np.string_(calibrationmap.rate_estimation) ) ranks = calibrationmap.ranks dataset = group.create_dataset( "loglike_cdf", data=np.array([calibrationmap._loglike_cdf[rank] for rank in ranks]), ) dataset.attrs.create("ranks", ranks) dataset.attrs.create( "num_quantiles", int(round(1.0 / calibrationmap._dq, 0)) ) # store clean segments if applicable if calibrationmap._clean_segs is not None: _segs2dataset( group, calibrationmap.start, calibrationmap.end, calibrationmap._clean_segs, seg_name="clean_segments", ) for name, dataset, kde in [ ("gch", calibrationmap._gch_dataset, calibrationmap._gch_kde), ("cln", calibrationmap._cln_dataset, calibrationmap._cln_kde), ]: g = group.create_group(name) # store the dataset _dataset2dataset(g, dataset) if dataset.segs is not None: _segs2dataset(g, dataset.start, dataset.end, dataset.segs) # store the discrete KDEs dataset = g.create_dataset("observations", data=kde.obs) def _write_CalibrationMap(self, path, calibrationmap, **kwargs): with h5py.File(path, "w") as file_obj: group = file_obj.create_group(self._CalibrationMap_group) group.attrs.create("hash", np.string_(calibrationmap.hash)) group.attrs.create("model_id", np.string_(calibrationmap.model_id)) group.attrs.create( "rate_estimation", np.string_(calibrationmap.rate_estimation) ) # store loglike_cdf interpolation object dataset = group.create_dataset( "interp_loglike_cdf", data=calibrationmap._interp_loglike_cdf ) dataset.attrs.create("num_points", len(calibrationmap._ranks)) dataset.attrs.create( "num_quantiles", int(round(1.0 / calibrationmap._dq, 0)) ) # store clean segments if applicable if calibrationmap._clean_segs is not None: _segs2dataset( group, calibrationmap.start, calibrationmap.end, calibrationmap._clean_segs, seg_name="clean_segments", ) # store individual interpolation objects for name, dataset, kde in [ ("gch", calibrationmap._gch_dataset, calibrationmap._gch_kde), ("cln", calibrationmap._cln_dataset, calibrationmap._cln_kde), ]: g = group.create_group(name) # store the datasets _dataset2dataset(g, dataset) if dataset.segs is not None: _segs2dataset(g, dataset.start, dataset.end, dataset.segs) # store the processed KDEs data = np.array( list( zip( kde._interp_x, kde._interp_logpdf, kde._interp_logpdf_m1[:, 0], kde._interp_logpdf_m2[:, 0], kde._interp_cdf, kde._interp_cdf_m1[:, 0], kde._interp_cdf_m2[:, 0], kde._interp_pdf_alpha, kde._interp_pdf_beta, kde._interp_cdf_alpha, kde._interp_cdf_beta, ) ), dtype=self._interp_dtype, ) dataset = g.create_dataset("interp", data=data) dataset.attrs.create("b", kde.b) dataset.attrs.create("num_points", kde._num_points) dataset.attrs.create("count", kde._count) @classmethod def read(cls, path): with h5py.File(path, "r") as file_obj: if cls._DiscreteCalibrationMap_group in file_obj: return cls._read_DiscreteCalibrationMap( file_obj[cls._DiscreteCalibrationMap_group] ) elif cls._CalibrationMap_group in file_obj: return cls._read_CalibrationMap(file_obj[cls._CalibrationMap_group]) else: raise ValueError( "could not find %s or %s in %s" % ( cls._DiscreteCalibrationMap_group, cls._CalibrationMap_group, path, ) ) def _read_DiscreteCalibrationMap(self, group): loglike = group["loglike_cdf"] # pull out datasets gch = _h5group2dataset(group["gch"]) cln = _h5group2dataset(group["cln"]) # read in clean segments if "clean_segments" in group: clean_segs = segmentlist([]) for start_sec, start_ns, end_sec, end_ns in group["clean_segments"][...]: clean_segs.append( segment( utils.sec_ns2float(start_sec, start_ns), utils.sec_ns2float(end_sec, end_ns), ) ) else: clean_segs = None # extract model ID # NOTE: older calibration maps don't have the model_id attribute, # which is needed to determine which model was used in samples # in this calibration map. this still allows data discoverability try: model_id = group.attrs["model_id"].decode("utf-8") except KeyError: model_id = None # set up the calibration map's provenance dummy_dataset = features.Dataset() dummy_dataset.start = min(gch.start, cln.start) dummy_dataset.end = max(gch.end, cln.end) dummy_dataset.segs = gch.segs | cln.segs calibrationmap = calibration.DiscreteCalibrationMap( dummy_dataset, num_quantiles=loglike.attrs["num_quantiles"], compute=False, model_id=model_id, clean_segs=clean_segs, rate_estimation=group.attrs["rate_estimation"].decode("utf-8"), ) calibrationmap.hash = group.attrs["hash"].decode("utf-8") # fill in datasets calibrationmap._gch_dataset = gch calibrationmap._cln_dataset = cln # fill in observations for each kde, which will # re-compute everything needed therein (pretty cheap) calibrationmap._gch_kde.add_observations(group["gch/observations"][...]) calibrationmap._cln_kde.add_observations(group["cln/observations"][...]) # fill in interpolation objects for loglike_quantiles # just copy the answer straight out of the hdf5 file calibrationmap._loglike_cdf = dict( (rank, thing) for rank, thing in zip(loglike.attrs["ranks"], loglike[...]) ) # compute the rates used for the prior odds and such calibrationmap.compute_rates() return calibrationmap @classmethod def _read_CalibrationMap(cls, group): interp_loglike = group["interp_loglike_cdf"] # pull out datasets gch = _h5group2dataset(group["gch"]) cln = _h5group2dataset(group["cln"]) # pull out interpolated things gch_interp = group["gch/interp"] cln_interp = group["cln/interp"] # read in clean segments if "clean_segments" in group: clean_segs = segmentlist([]) for start_sec, start_ns, end_sec, end_ns in group["clean_segments"][...]: clean_segs.append( segment( utils.sec_ns2float(start_sec, start_ns), utils.sec_ns2float(end_sec, end_ns), ) ) else: clean_segs = None # extract model ID # NOTE: older calibration maps don't have the model_id attribute, # which is needed to determine which model was used in samples # in this calibration map. this still allows data discoverability try: model_id = group.attrs["model_id"].decode("utf-8") except KeyError: model_id = None # set up the calibration map's provenance start = min(gch.start, cln.start) end = max(gch.end, cln.end) segs = gch.segs | cln.segs dummy_dataset = features.Dataset([], start=start, end=end, segs=segs) calibrationmap = calibration.CalibrationMap( dummy_dataset, num_quantiles=interp_loglike.attrs["num_quantiles"], num_points=interp_loglike.attrs["num_points"], gch_num_points=gch_interp.attrs["num_points"], gch_b=gch_interp.attrs["b"], cln_num_points=cln_interp.attrs["num_points"], cln_b=cln_interp.attrs["b"], compute=False, clean_segs=clean_segs, model_id=model_id, rate_estimation=group.attrs["rate_estimation"].decode("utf-8"), ) calibrationmap.hash = group.attrs["hash"].decode("utf-8") # fill in datasets calibrationmap._gch_dataset = gch calibrationmap._cln_dataset = cln # fill in interpolation objects for loglike_quantiles calibrationmap._interp_loglike_cdf = interp_loglike[...] # fill in recorded interpolation objects for kde, dataset, obj in [ (calibrationmap._gch_kde, gch, gch_interp), (calibrationmap._cln_kde, cln, cln_interp), ]: # high-level stuff kde._obs = dataset.ranks kde._count = obj.attrs["count"] # interpolation stuff kde._interp_x[:] = obj["x"] kde._interp_logpdf[:] = obj["logpdf"] kde._interp_logpdf_m1[:, 0] = obj["logpdf_m1"] kde._interp_logpdf_m2[:, 0] = obj["logpdf_m2"] kde._interp_cdf = obj["cdf"] kde._interp_cdf_m1[:, 0] = obj["cdf_m1"] kde._interp_cdf_m2[:, 0] = obj["cdf_m2"] kde._interp_pdf_alpha[:] = obj["pdf_alpha"] kde._interp_pdf_beta[:] = obj["pdf_beta"] kde._interp_cdf_alpha[:] = obj["cdf_alpha"] kde._interp_cdf_beta[:] = obj["cdf_beta"] # compute the rates used for the prior odds and such calibrationmap.compute_rates() return calibrationmap
HDF5SERIES_REPORTER_DATASET_TEMPLATE = "%d_%d"
[docs]class HDF5SeriesReporter(HDF5Reporter): """ store series information in hdf5 files """ _T0_NAME = "t0" _DELTAT_NAME = "deltaT" _MODEL_NAME = "idq_model" _CALIB_NAME = "idq_calib" def _write(self, path, all_series, run=0, **kwargs): if isinstance(all_series, SeriesInfo): all_series = [all_series] with h5py.File(path, "w") as file_obj: group = file_obj.create_group("series") for series in all_series: dataset_name = HDF5SERIES_REPORTER_DATASET_TEMPLATE % ( int(series.t0), int(series.dt), ) data = np.array( list(zip(*(series.values()))), dtype=[(key, "float") for key in series.keys()], ) dataset = group.create_dataset(dataset_name, data=data) dataset.attrs.create(self._T0_NAME, series.t0) dataset.attrs.create(self._DELTAT_NAME, series.dt) # record the associated model hash dataset.attrs.create(self._MODEL_NAME, np.string_(series.model_id)) # and the calibration map hash dataset.attrs.create( self._CALIB_NAME, np.string_(series.calibration_id) ) dataset.attrs.create("run", run) # NOTE: could be fragile dataset.attrs.create( "name", np.string_("-".join(os.path.basename(path).split("-")[:-2])) ) # store channel info dataset.attrs.create("detector", series.info.detector) dataset.attrs.create("nickname", series.info.nickname) dataset.attrs.create("f_low", series.info.f_low) dataset.attrs.create("f_high", series.info.f_high) for key in series.keys(): dataset.attrs.create(key + "_unit", np.string_("")) @classmethod def read(cls, path): all_series = [] with h5py.File(path, "r") as file_obj: group = file_obj["series"] for key in group.keys(): dataset = group[key] info = ChannelInfo( detector=dataset.attrs["detector"], nickname=dataset.attrs["nickname"], f_low=dataset.attrs["f_low"], f_high=dataset.attrs["f_high"], ) sdict = {} for field in dataset[...].dtype.fields.keys(): metric = info.channel_to_metric_name(field) sdict[metric] = Series(field, dataset[field]) series = SeriesInfo( t0=dataset.attrs[cls._T0_NAME], dt=dataset.attrs[cls._DELTAT_NAME], info=info, model_id=str(np.char.decode(dataset.attrs[cls._MODEL_NAME])), calibration_id=str(np.char.decode(dataset.attrs[cls._CALIB_NAME])), **sdict, ) all_series.append(series) return all_series
@hookimpl def get_reporters(): return { "hdf5": HDF5Reporter, "span": GPSTimesReporter, "span:hdf5": GPSTimesReporter, "series": HDF5SeriesReporter, "series:hdf5": HDF5SeriesReporter, "segment": HDF5SegmentReporter, "segment:hdf5": HDF5SegmentReporter, "dataset": DatasetReporter, "dataset:hdf5": DatasetReporter, "calib": CalibrationMapReporter, "calib:hdf5": CalibrationMapReporter, }