Source code for idq.io.reporters

import glob
import os
import shutil
import time

import pluggy

from ligo.segments import segment, segmentlist

from ... import names
from ... import utils


DEFAULT_SHMDIR_AGE = 600


[docs]class Reporter(object): """ an object that handles I/O of models, calibration_maps this thing knows about file-systems and directory structures the specifics about where things will be stored, etc should be set on creation NOTE: this is just a parent class defining the API. Extensions of this class must define their own functionality. Upon instantiation, we require some knowledge of where these things will be written (rootdir) as well as some knowledge of what time range the span (start, end) """ _required_kwargs = [] def __init__(self, rootdir, start, end, **kwargs): self._rootdir = rootdir self._start = start self._end = end for kwarg in self._required_kwargs: assert kwarg in kwargs, "kwarg=%s required" % kwarg self.kwargs = kwargs @property def rootdir(self): return self._rootdir @rootdir.setter def rootdir(self, rootdir): self._rootdir = rootdir @property def start(self): return self._start @start.setter def start(self, start): self._start = start @property def end(self): return self._end @end.setter def end(self, end): self._end = end def preferred(self, nickname): return None
[docs] def report(self, nickname, obj, preferred=False): """ write a serialized version of the object into a path determined by kwargs """ raise NotImplementedError
[docs] def retrieve(self, nickname, preferred=False): """ the inverse of report returns the object that was serialized if preferred, looks for the most recent object, which does not necessarily correspond to where this thing would write serialized output supported for convenient retrieval """ raise NotImplementedError
[docs] @classmethod def read(cls, path): """ return the object from the path given """ raise NotImplementedError
[docs] def glob(self, nickname, start, end): """return a generator over all existing data associated with nickname within [start, end) NOTE: this should work without modifying the object! """ raise NotImplementedError
[docs]class DiskReporter(Reporter): """ a sub-class of Reporter that reads/writes to the local disk (by pickling objects) this defines a directory structure that it expects and works within that paradigm """ # we'll always write plaintxt for this, so suffix doesn't really matter _suffix = "txt" _cache_template = ".%s-preferred.cache" def __init__(self, *args, **kwargs): """ delegates to Reporter.__init__ and then makes sure the correct directory exists """ Reporter.__init__(self, *args, **kwargs) # define this upon instantiation so each instance has it set independently self._cache_mtime = dict() if "shmdir" in self.kwargs: shmdir = self.kwargs["shmdir"] os.makedirs(shmdir, exist_ok=True) # set temp directory if "tmpdir" in self.kwargs: self._tmpdir = self.kwargs["tmpdir"] elif "TMPDIR" in os.environ: self._tmpdir = os.environ["TMPDIR"] else: self._tmpdir = "/tmp" # make temp directory if it doesn't exist os.makedirs(self._tmpdir, exist_ok=True) @property def tmpdir(self): return self._tmpdir @property def suffix(self): return self._suffix def directory(self, start, end): """checks to make sure this directory exists whenever its called""" directory = names.start2dirMOD1e5(start, basename="START", rootdir=self.rootdir) os.makedirs(directory, exist_ok=True) return directory def path(self, nickname, start, end): """ standardizes file naming conventions via delegations to idq.names """ return os.path.join( self.directory(start, end), names.start_end2path(nickname, start, end, suffix=self.suffix), ) def _cachepath(self, nickname): return os.path.join( self.rootdir, self._cache_template % (nickname), ) def new_preferred(self, nickname): """ determines if there is a new preferred path by checking the mtime of the cache Note: this updates self._cache_time, which is used to determine whether the cache has been touched since we last asked """ cachepath = self._cachepath(nickname) if os.path.exists(cachepath): # path must exist for us to have a preferred mtime = os.path.getmtime(cachepath) # check this against the stored mtime if mtime != self._cache_mtime.get(nickname, None): self._cache_mtime[nickname] = mtime # update the stored mtime return True return False def update_preferred(self, nickname, path): """ write a path into the preferred cachefile NOTE: this is *not* automatically called within report. users must update the cache themselves. """ cachepath = self._cachepath(nickname) with open(cachepath, "a") as file_obj: file_obj.write("\n" + path) def preferred(self, nickname): """ get the path that redirects to the preferred file does this by referencing a cachepath that's automatically defined relative to rootdir """ if not self.new_preferred(nickname): return None else: # we can rely on this existing because new_preferred was True return self._tail(self._cachepath(nickname)) def _tail(self, path): """ retrieve the last 'preferred' file from the cache see https://stackoverflow.com/a/54278929 """ with open(path, "rb") as f: try: # catch OSError in case of a one line file f.seek(-2, os.SEEK_END) while f.read(1) != b"\n": f.seek(-2, os.SEEK_CUR) except OSError: f.seek(0) return f.readline().strip().decode() def report(self, nickname, obj, preferred=False, **kwargs): """ writes string into plaintxt file """ path = self.path(nickname, self.start, self.end) basename = os.path.basename(path) if self.kwargs.get("shmdir_only", False): path = os.path.join(self.tmpdir, basename) tmppath = os.path.join(self.tmpdir, "." + basename + ".tmp") # save to temp directory, and copy/move over as requested self._write(tmppath, obj, nickname=nickname, **kwargs) self._manage_shmdir( tmppath, basename, age=self.kwargs.get("shmdir_age", DEFAULT_SHMDIR_AGE), persist=self.kwargs.get("shmdir_persist", False), ) shutil.move(tmppath, path) if preferred: self.update_preferred(nickname, path) return path def _write(self, path, string, **kwargs): with open(path, "w") as file_obj: file_obj.write(string) def retrieve(self, nickname, preferred=False): if preferred: path = self.preferred(nickname) else: path = self.path(nickname, self.start, self.end) if path is not None: return self.read(path) @classmethod def read(cls, path): with open(path, "r") as file_obj: string = file_obj.read() return string def _manage_shmdir(self, path, basename, age=DEFAULT_SHMDIR_AGE, persist=False): if "shmdir" in self.kwargs: # assume this directory already exists, see __init__ shmdir = self.kwargs["shmdir"] # copy to a temporary file and then move to make this atomic tmppath = os.path.join(shmdir, "." + os.path.basename(path) + ".tmp") shutil.copyfile(path, tmppath) shutil.move(tmppath, os.path.join(shmdir, basename)) # get rid of old things in shmdir if not persist: self._cleanup(shmdir, age=age) def _cleanup(self, directory, age=DEFAULT_SHMDIR_AGE): """ Delete files in directory that are older than age given """ for file_ in os.listdir(directory): filepath = os.path.join(directory, file_) try: if time.time() - os.path.getmtime(filepath) > age: # file hasn't been modified in a while os.remove(filepath) except OSError: # file was already deleted, can happen with parallel workflows pass def _glob_paths(self, nickname, start, end, data_id=None, all_dirs=False): """a generator for the set of paths needed by glob NOTE: if there are overlapping ranges, all will be returned. They are sorted by start time and ties are broken by durations (shorter durations first) ALSO NOTE: this returns all data with any overlap with the requested range and does NOT truncate or extend it to fit the specified range. """ bounds = [] span = segmentlist([segment(start, end)]) if all_dirs: gps_dirs = sorted(glob.glob(os.path.join(self.rootdir, "*"))) else: gps_dirs = [ os.path.join(self.rootdir, f"START-{gps_dir}") for gps_dir in utils.start_end2gps_dirs(start, end, lookback=1) ] for directory in gps_dirs: if not os.path.isdir(directory): continue try: # top level directory S, E = names.dirMOD1e52start_end(directory) if S > end: # we've run off the end break for file_ in sorted( glob.glob( os.path.join( directory, names.glob2path(f"{nickname}*", suffix=self.suffix), ) ) ): try: s, e = names.path2start_end(file_, suffix=self.suffix) if s > end: # we've run off the end continue # we already checked (s<end), so we don't check it again if utils.livetime(segmentlist([segment(s, e)]) & span): # some overlap bounds.append((s, e)) except ValueError: pass except ValueError: # bad formatting pass # remove duplicates and sort bounds = list(set(bounds)) bounds.sort(key=lambda _: _[1]) # sort by end time first bounds.sort(key=lambda _: _[0]) # then sort by start time for start, end in bounds: if data_id is None: paths = [self.path(nickname, start, end)] else: paths = glob.glob(self.path(f"{nickname}_{data_id}", start, end)) + [ self.path(nickname, start, end) ] for path in paths: if os.path.exists(path): # there is data for this nickname in this directory yield start, end, path def glob(self, nickname, start, end, data_id=None, all_dirs=False): """yield all data associated with nickname within [start, end)""" for s, e, path in self._glob_paths( nickname, start, end, data_id=data_id, all_dirs=all_dirs ): data = self.read(path) yield s, e, data
hookspec = pluggy.HookspecMarker("iDQ") @hookspec def get_reporters(): """ This hook is used to return Reporters in the form: {"[type:]format": Reporter} where the type (optional if generic) refers to a specific reporter handling specific data products: - span - series - segment - model - calib - dataset """