Source code for idq.io.reporters.gwf

import os

import numpy as np

import gpstime
from LDAStools import frameCPP as frcpp

from ... import hookimpl
from ... import utils
from ...series import ChannelInfo, Series, SeriesInfo
from . import DiskReporter


[docs]class GWFSeriesReporter(DiskReporter): """ store series informaiton in gwf files NOTE: GWFSeriesReporter currently only supports a single contiguous segment and will raise an exception if it finds more than one. This is most likely the most common use cause (e.g.: streaming timeseries ignoring segdb), so it's probably fine """ _suffix = "gwf" _MODEL_NAME = "IDQ_MODEL" _CALIB_NAME = "IDQ_CALIB" def _write(self, path, series, run=0, **kwargs): if isinstance(series, list): assert ( len(series) == 1 ), "GWFSeriesReporter only supports a single contiguous segment of data" series = series[0] duration = series.dt * len(series) # make the frame frame = frcpp.FrameH() # NOTE: SetName could be fragile frame.SetName("-".join(os.path.basename(path).split("-")[:-2])) frame.SetGTime(frcpp.GPSTime(*[int(_) for _ in utils.float2sec_ns(series.t0)])) frame.SetDt(duration) frame.SetRun(run) # add provenance information current_gps = int(gpstime.gpsnow()) frame.AppendFrHistory( frcpp.FrHistory(self._MODEL_NAME, current_gps, series.model_id) ) frame.AppendFrHistory( frcpp.FrHistory(self._CALIB_NAME, current_gps, series.calibration_id) ) # add data for channel, data in series.items(): # make a vector frvect = frcpp.FrVect( channel, frcpp.FrVect.FR_VECT_8R, 1, # number of dimensions frcpp.Dimension( # the dimension object len(series), # number of elements series.dt, # spacing "s", # units 0, # start time offset relative to the header (set above) ), "", # dimensionless unit, cause we have probabilities and such ) # actually populate the object frvect.GetDataArray()[:] = data # add that vector to the ProcData frdata = frcpp.FrProcData( channel, "", # no comment frcpp.FrProcData.TIME_SERIES, # ID as time-series frcpp.FrProcData.UNKNOWN_SUB_TYPE, # empty sub-type (fseries) 0, # offset of first sample relative to frame start duration, # duration of data 0.0, # heterodyne frequency 0.0, # phase of heterodyne 0.0, # frequency range 0.0, # resolution bandwidth ) # add the vector to this data frdata.AppendData(frvect) # add ProcData to FrameH frame.AppendFrProcData(frdata) # write frame.Write(frcpp.OFrameFStream(path)) @classmethod def read(cls, path): seriesdict = {} stream = frcpp.IFrameFStream(path) frame = stream.ReadNextFrame() # assume a single frame info = None deltaT = None # extract data toc = stream.GetTOC() for channels, fr_data in [ (toc.GetProc(), stream.ReadFrProcData), (toc.GetADC(), stream.ReadFrAdcData), (toc.GetSim(), stream.ReadFrSimData), ]: for channel in channels: # magic number: one frame per stream... data = fr_data(0, channel) dim = data.data[0].GetDim(0) # magic number: one FrVect wrapper per channel data = data.data[0].GetDataArray() if not info: info = ChannelInfo.from_channel(channel) deltaT = dim.dx series = np.empty_like(data, dtype=float) series[:] = data[:] metric = info.channel_to_metric_name(channel) seriesdict[metric] = Series(channel, series) # assume there is at least one channel # NOTE: these should be the same for all channels, so we only read them once t0 = utils.sec_ns2float(*frame.GetGTime()) # extract histories histories = dict() for history in frame.RefHistory(): histories[history.GetName()] = history.GetComment() for name in [cls._MODEL_NAME, cls._CALIB_NAME]: if name not in histories: raise KeyError( "could not find FrHistory named=%s in %s!" % (name, path) ) seriesinfo = SeriesInfo( t0, deltaT, info=info, model_id=histories[cls._MODEL_NAME], calibration_id=histories[cls._CALIB_NAME], **seriesdict, ) # we only know how to handle a single contiguous stretch of data at the moment return [seriesinfo]
@hookimpl def get_reporters(): return {"series:gwf": GWFSeriesReporter}