Source code for idq.calibration

import math

import numpy as np

from scipy.special import erf
from scipy.special import digamma
from scipy.special import polygamma

from scipy.stats import beta

from collections import defaultdict

from ligo.segments import segmentlist

from . import names
from . import utils


LOG3 = np.log(3)

DEFAULT_NUM_MC = 10000
DEFAULT_NUM_QUANTILES = 101
DEFAULT_NUM_POINTS = 101
DEFAULT_MAX_SAMPLES = np.infty  # use within flush logic
DEFAULT_MAX_STRIDE = np.infty

DEFAULT_B = 0.1
DEFAULT_MINB = 1e-4
DEFAULT_MAXB = 1e0
DEFAULT_TOL = 1e-4
DEFAULT_DLOGLIKE = 1

DEFAULT_GLITCH_B = 1 / 24.0
DEFAULT_CLEAN_B = 1 / 12.0

DEFAULT_RATE_ESTIMATION = "livetime"

DEFAULT_MIN_LIVETIME = 86400  # 1 day
DEFAULT_MIN_CLEAN_LIVETIME = 43200  # half a day


[docs]class FixedBandwidth1DKDE(object): """ an object that represents the calibration mapping between SupervisedClassifier output (ranks in [0, 1]) and probabilistic statements essentially a fancy interpolation device SupervisedClassifier and IncrementalSupervisedClassifier objects retain a reference to one of these functionality should be universal for all classifiers! We accomplish this with a standard fixed-bandwidth Gaussian KDE, represented internally as a vector that's referenced via interpolation for rapid execution. """ def __init__( self, observations, num_points=DEFAULT_NUM_POINTS, b=DEFAULT_B, **kwargs ): # set up basic references to data and hyperparameters self._obs = np.array([], dtype=float) self.b = b self._num_points = num_points # set up data structures used when computing KDE fitting parameters self._interp_x = np.linspace(0, 1, self._num_points) self._interp_logpdf = np.empty(self._num_points, dtype="float") self._interp_cdf = np.empty(self._num_points, dtype="float") # first and second moments of the pdf, which we calculate in the log and # do some backflips to maintain high precision self._interp_logpdf_m1 = np.empty((self._num_points, 4), dtype="float") self._interp_logpdf_m2 = np.empty((self._num_points, 2), dtype="float") # uniform distributions self._interp_pdf_alpha = np.ones(self._num_points, dtype="float") self._interp_pdf_beta = np.ones(self._num_points, dtype="float") # first and second moments for the CDF self._interp_cdf_m1 = np.empty((self._num_points, 2), dtype="float") self._interp_cdf_m2 = np.empty((self._num_points, 2), dtype="float") # uniform distributions self._interp_cdf_alpha = np.ones(self._num_points, dtype="float") self._interp_cdf_beta = np.ones(self._num_points, dtype="float") # the normalization of the whole thing self._count = 0.0 # set all values to be the default (no data present) self.flush(max_samples=0) # now add the observations self.add_observations(observations) @property def b(self): return self._b @b.setter def b(self, b): self._b = b self._ib2 = self.b**-2 self._ibsqrt2 = 1.0 / (2**0.5 * self.b) self._i2b2 = 0.5 * self._ib2 @property def obs(self): return self._obs @property def ranks(self): return self._interp_x @property def interp_x(self): return self._interp_x @property def interp_logpdf(self): return self._interp_logpdf @property def interp_cdf(self): return self._interp_cdf @property def interp_pdf_alpha(self): return self._interp_pdf_alpha @property def interp_pdf_beta(self): return self._interp_pdf_beta @property def interp_cdf_alpha(self): return self._interp_cdf_alpha @property def interp_cdf_beta(self): return self._interp_cdf_beta
[docs] def compute(self): """ this works just like update, except it zero's all the starting arrays and then delegates to update handing it all the current observations """ obs = self.obs # re-set everything self.flush(max_samples=0) # add the observations back in, which triggers updates to internal KDE # representations self.add_observations(obs)
[docs] def flush(self, max_samples=DEFAULT_MAX_SAMPLES): """ remove the observations from the front of the list until the total number of observations is <=max_samples. """ N = len(self.obs) if max_samples == 0: # don't bother re-computing contributions and subtracting, just zero # everything out, forget about all existing observations self._obs = np.array([], dtype=float) # interpolation stuff self._interp_logpdf[:] = -np.infty self._interp_cdf[:] = 0.0 # first and second moments of the pdf, which we calculate in the log # and do some backflips to maintain high precision self._interp_logpdf_m1[:, 0] = -np.infty self._interp_logpdf_m2[:, 0] = -np.infty # first and second moments for the CDF self._interp_cdf_m1[:, 0] = 0.0 self._interp_cdf_m2[:, 0] = 0.0 # the normalization of the whole thing self._count = 0.0 elif max_samples < N: # keep only a fraction of things, so strip out the first few # observations self.remove_observations(self.obs[: N - max_samples]) else: # keep more obsevations than we have, so we don't do anything pass
def _rank2delta(self, rank): """ fill in the internal data structures with the contributions from just this one rank """ x0 = self._interp_x - rank x1 = self._interp_x + rank x2 = self._interp_x - 2 + rank z0 = -(x0**2) z1 = -(x1**2) z2 = -(x2**2) # compute pdf 1st moment self._interp_logpdf_m1[:, 1] = self._i2b2 * z0 self._interp_logpdf_m1[:, 2] = self._i2b2 * z1 self._interp_logpdf_m1[:, 3] = self._i2b2 * z2 m = np.max(self._interp_logpdf_m1[:, 1:], axis=1) # NOTE, we divide pdf by 3 so that the thing we fit with a # beta-distribution is bound between 0 and 1. self._interp_logpdf_m1[:, 1] = ( np.log( np.sum(np.exp(self._interp_logpdf_m1[:, 1:].transpose() - m), axis=0) ) + m - LOG3 ) # compute 2nd moment for pdf error estimates self._interp_logpdf_m2[:, 1] = self._interp_logpdf_m1[:, 1] * 2 # compute 2nd momemt for cdf error estimates erf0 = erf(-rank * self._ibsqrt2) erf1 = erf(rank * self._ibsqrt2) erf2 = erf((-2 + rank) * self._ibsqrt2) self._interp_cdf_m1[:, 1] = 0.5 * ( erf(x0 * self._ibsqrt2) - erf0 + erf(x1 * self._ibsqrt2) - erf1 + erf(x2 * self._ibsqrt2) - erf2 ) self._interp_cdf_m2[:, 1] = self._interp_cdf_m1[:, 1] ** 2 dcount = 0.5 * ( erf((1 - rank) * self._ibsqrt2) - erf0 + erf((1 + rank) * self._ibsqrt2) - erf1 + erf((-1 + rank) * self._ibsqrt2) - erf2 ) return dcount
[docs] def remove_observations(self, observations): """ works just like update, except we subtract out the contributions of the observations instead of adding them in """ if isinstance(observations, (int, float)): observations = np.array([observations]) # remove observations # we check that we can remove all observations before we update any # internal structures! obs = list(self.obs) for rank in observations: try: obs.remove(rank) # removes the first occurance of this value except ValueError: raise ValueError( "cannot remove rank=%.3e because it is not contained in KDE" % rank ) self._obs = np.array(obs) # iterate and remove contributions truth = np.empty(self._num_points, dtype=bool) for rank in observations: # compute the changes associated with just this one observation dcount = self._rank2delta(rank) # update 1st moment for pdf error self._interp_logpdf_m1[:, 1] -= self._interp_logpdf_m1[:, 0] truth[:] = self._interp_logpdf_m1[:, 1] > 0 self._interp_logpdf_m1[truth, 1] = 0 self._interp_logpdf_m1[:, 0] += np.log( 1 - np.exp(self._interp_logpdf_m1[:, 1]) ) # update 2nd moment for pdf error estimates self._interp_logpdf_m2[:, 1] -= self._interp_logpdf_m2[:, 0] truth[:] = self._interp_logpdf_m2[:, 1] > 0 self._interp_logpdf_m2[truth, 1] = 0 self._interp_logpdf_m2[:, 0] += np.log( 1 - np.exp(self._interp_logpdf_m2[:, 1]) ) # update momemts for cdf error estimates self._interp_cdf_m1[:, 0] -= self._interp_cdf_m1[:, 1] self._interp_cdf_m2[:, 0] -= self._interp_cdf_m2[:, 1] self._count -= dcount # update fitting parameters self._compute_params()
[docs] def add_observations(self, observations): """ update the local data structures using the new observations compute the interpolation arrays used for fast execution later this includes error estimates, which we compute by fitting beta distributions to the first 2 moments of the pdf and cdf, respectively """ if isinstance(observations, (int, float)): observations = np.array([observations]) self._obs = np.concatenate((self._obs, observations)) # iterate and add contributions for rank in observations: # compute the changes associated with just this one observation dcount = self._rank2delta(rank) # update 1st moment for pdf error m = np.max(self._interp_logpdf_m1[:, :2], axis=1) self._interp_logpdf_m1[:, 0] = ( np.log( np.sum( np.exp(self._interp_logpdf_m1[:, :2].transpose() - m), axis=0 ) ) + m ) # update 2nd moment for pdf error estimates m = np.max(self._interp_logpdf_m2, axis=1) self._interp_logpdf_m2[:, 0] = ( np.log(np.sum(np.exp(self._interp_logpdf_m2.transpose() - m), axis=0)) + m ) # update momemts for cdf error estimates self._interp_cdf_m1[:, 0] += self._interp_cdf_m1[:, 1] self._interp_cdf_m2[:, 0] += self._interp_cdf_m2[:, 1] self._count += dcount # update fitting parameters self._compute_params()
def _compute_params(self): """ map the internal references for 1st and 2nd moments into interpolation parameters for beta distribution NOTE: if the distributions don't really fit withih rank in [0,1], doing this may bias our error estimates! """ if self._count: # compute parameters for error estimates # NOTE: for a beta distribution described by 2 parameters: alpha, beta # pdf(x|alpha, beta) = Gamma(alpha+beta)/(Gamma(alpha)*Gamma(beta)) # * x**(alpha-1) * (1-x)**(beta-1) # E[x] = alpha/(alpha+beta) # Var[x] = alpha*beta/((alpha+beta)**2 * (alpha+beta+1)) inum_obs = 1.0 / self._count lognum_obs = np.log(self._count) # pdf # expected value Ep = np.exp(self._interp_logpdf_m1[:, 0] - lognum_obs) # variance Vp = ( np.exp(self._interp_logpdf_m2[:, 0] - lognum_obs) - Ep**2 ) * inum_obs # handle numerical error explicitly by imposing a minimum variance # assume samples are distributed according to a Gaussian with stdv # ~0.1*self.b, which is much smaller than we can resolve minVp = # 0.5*(0.1)**4 / num_obs minVp = 5.0e-5 * inum_obs Vp[Vp < minVp] = minVp # handle numerical error explicitly self._interp_pdf_alpha, self._interp_pdf_beta = self._cumulants2params( Ep, Vp ) # cdf # expected value Ec = self._interp_cdf_m1[:, 0] * inum_obs # variance Vc = (self._interp_cdf_m2[:, 0] * inum_obs - Ec**2) * inum_obs # handle numerical error explicitly by imposing a minimum variance # NOTE: we use the same mimimum as Vp without much justification... Vc[Vc < minVp] = minVp # handle numerical error explicitly self._interp_cdf_alpha, self._interp_cdf_beta = self._cumulants2params( Ec, Vc ) # normalize the pdf, compute the cdf # NOTE: if the distributions are entirely contained within rank in # [0,1], these should be equivalent to Ep and Ec, respectively # NOTE: we do NOT normalize pdfs so they integrate to one with rank # in [0, 1]. USER BEWARE! self._interp_logpdf[:] = ( self._interp_logpdf_m1[:, 0] - np.log(self._count) - 0.5 * np.log(2 * np.pi * self.b**2) + LOG3 ) self._interp_cdf[:] = Ec # handle numerical stability issues explicitly self._interp_cdf[self._interp_cdf < 0] = 0 self._interp_cdf[self._interp_cdf > 1] = 1 else: # no data, so set everything to a uniform distribution self._interp_logpdf[:] = 0.5 self._interp_cdf[:] = np.linspace(0, 1, self._num_points) self._interp_pdf_alpha[:] = 1 # uniform distributions self._interp_pdf_beta[:] = 1 self._interp_cdf_alpha[:] = 1 # uniform distributions # NOTE: this may be wonky when we go from no data to some data, but # that shouldn't be the main use case self._interp_cdf_beta[:] = 1 def _cumulants2params(self, E, V, safe=False): """ compute the beta-distribution parameters corresponding to this expected value and variance """ # now actually compute things x = (1.0 - E) * E - V alpha = (E / V) * x beta = ((1.0 - E) / V) * x bad = alpha <= 0 if np.any(bad): if safe: raise Warning("alpha<=0") # FIXME, this hard-coding could be bad, but we're talking about when # my model breaks down anyway... alpha[bad] = 1e-8 beta[bad] = 1e-2 bad = beta <= 0 if np.any(bad): if safe: raise Warning("beta<=0") # FIXME, this hard-coding could be bad, but we're talking about when # my model breaks down anyway... beta[bad] = 1e-8 alpha[bad] = 1e-2 return alpha, beta
[docs] def loglike(self, b): r""" compute the loglikelihood at this value of b loglike = (1/N)\sum_i log( (1/(N-1)) \sum_{j \\neq i} """ if len(self.obs) < 2: # only one sample, so cross validation is ill defined return 0.0 logl = 0.0 i2b2 = 0.5 * b**-2 truth = np.ones_like(self.obs, dtype=bool) z = np.empty(len(truth) - 1, dtype=float) for ind, rank in enumerate(self.obs): truth[ind - 1] = True truth[ind] = False z[:] = -((rank - self.obs[truth]) ** 2) * i2b2 m = np.max(z) logl += np.log(np.sum(np.exp(z - m))) + m N = self._count return (logl - np.log(N - 1) - 0.5 * np.log(2 * np.pi) - np.log(self.b)) / N
[docs] def grad(self, b): r""" computes dloglike/dlogb at b dloglike/dlogb = (1/N) \sum_i \frac{sum_{j \neq i} ((x_i-x_j)**2/b**2 - 1) * exp(-0.5*(x_i-x_j)**2/b**2)}{\sum_{j \neq i} * exp(-0.5*(x_i-x_j)**2/b**2)} """ if len(self.obs) < 2: # cross validation is ill defined, so just return zero return 0.0 dlogL = 0.0 ib2 = b**-2 truth = np.ones_like(self.obs, dtype=bool) z = np.empty(len(truth) - 1, dtype=float) weights = np.empty_like(z, dtype=float) for ind, rank in enumerate(self.obs): truth[ind - 1] = True truth[ind] = False z[:] = (rank - self.obs[truth]) ** 2 * ib2 weights[:] = -0.5 * z m = np.max(weights) weights[:] = np.exp(weights - m) dlogL += np.sum(weights * (z - 1)) / np.sum(weights) return dlogL / len(self.obs)
[docs] def optimize( self, minb=DEFAULT_MINB, maxb=DEFAULT_MAXB, tol=DEFAULT_TOL, bounded=False, b_factor=1 / 12, **kwargs ): """ looks for the maximum of logL via Newton's method for the zeros of dlogL/dlogb expects dlogL/dlogb to be monotonic in logb, which will likely be true. However, if it is not then the logic in this loop may fail. """ # set lower bound for bandwidth if bounded: N = len(self.obs) minb = max(minb, (1.06 * b_factor) * math.pow(N, -0.2)) # check if we're already within tolerance if maxb - minb <= minb * tol: self.b = (minb * maxb) ** 0.5 else: # check if end points are optima MdlogL = self.grad(maxb) if MdlogL >= 0: self.b = maxb else: mdlogL = self.grad(minb) if mdlogL <= 0: self.b = minb else: # iterate through bisection search until termination # condition is reached while maxb - minb > minb * tol: midb = (minb * maxb) ** 0.5 dlogL = self.grad(midb) if dlogL == 0: self.b = midb break elif dlogL > 0: minb = midb else: # dlogL < 0 maxb = midb else: self.b = (minb * maxb) ** 0.5 # update internal data structures to reflect new bandwidth self.compute() return None
def cdf(self, ranks): return np.interp(ranks, self._interp_x, self._interp_cdf)
[docs] def cdf_alpha(self, ranks): """ NOTE: because force the cdf to go through (0,0) and (1,1), the error estimates are messed up near the end points because I can't represent that with a beta function. This turns out to also affect the second-to-last point as well because of how np.interp works (the nan from the last point get mixed in) """ return np.interp(ranks, self._interp_x, self._interp_cdf_alpha)
[docs] def cdf_beta(self, ranks): """ NOTE: because force the cdf to go through (0,0) and (1,1), the error estimates are messed up near the end points because I can't represent that with a beta function. This turns out to also affect the second-to-last point as well because of how np.interp works (the nan from the last point get mixed in) """ return np.interp(ranks, self._interp_x, self._interp_cdf_beta)
def cdf_std(self, ranks): return np.array( [beta.std(self.cdf_alpha(rank), self.cdf_beta(rank)) for rank in ranks] ) def cdf_var(self, ranks): return np.array( [beta.var(self.cdf_alpha(rank), self.cdf_beta(rank)) for rank in ranks] )
[docs] def cdf_quantile(self, ranks, q): """ NOTE: because force the cdf to go through (0,0) and (1,1), the error estimates are messed up near the end points because I can't represent that with a beta function. This turns out to also affect the second-to-last point as well because of how np.interp works (the nan from the last point get mixed in) """ return np.array( [beta.ppf(q, self.cdf_alpha(rank), self.cdf_beta(rank)) for rank in ranks] )
def quantile(self, q): return np.interp(q, self._interp_cdf, self._interp_x) def pdf(self, ranks): return np.exp(self.logpdf(ranks)) def pdf_alpha(self, ranks): return np.interp(ranks, self._interp_x, self._interp_pdf_alpha) def pdf_beta(self, ranks): return np.interp(ranks, self._interp_x, self._interp_pdf_beta) def pdf_std(self, ranks): return np.array( [beta.std(self.pdf_alpha(rank), self.pdf_beta(rank)) for rank in ranks] ) def pdf_var(self, ranks): return np.array( [beta.var(self.pdf_alpha(rank), self.pdf_beta(rank)) for rank in ranks] ) @property def _pdf_scale(self): return 3.0 / ((2 * np.pi) ** 0.5 * self.b) def pdf_quantile(self, ranks, q): scale = self._pdf_scale return np.array( [ beta.ppf( q, self.pdf_alpha(rank), self.pdf_beta(rank), loc=0, scale=scale ) for rank in ranks ] )
[docs] def logpdf(self, ranks): """ NOTE: this returns log(E[pdf]), not E[log(pdf)] """ return np.interp(ranks, self._interp_x, self._interp_logpdf)
[docs] def logpdf_mean(self, ranks): """ NOTE: this returns E[log(pdf)] whereas logpdf returns log(E[pdf]) """ scale = np.log(self._pdf_scale) return ( np.array( [ digamma(self.pdf_alpha(rank)) - digamma(self.pdf_alpha(rank) + self.pdf_beta(rank)) for rank in ranks ] ) - scale )
def logpdf_std(self, ranks): return self.logpdf_var(ranks) ** 0.5 def logpdf_var(self, ranks): scale = np.log(self._pdf_scale) return ( np.array( [ polygamma(1, self.pdf_alpha(rank)) - polygamma(1, self.pdf_alpha(rank) + self.pdf_beta(rank)) for rank in ranks ] ) - scale ) def logpdf_quantile(self, ranks, q): return np.log(self.pdf_quantile(ranks, q))
[docs] def coverage(self): """return ranks, CDF(ranks), fraction of ranks with CDF(rank) <= X""" ranks = self.ranks return ( ranks, self.cdf(ranks), np.array([np.sum(self._obs <= r) for r in ranks], dtype=float) / len(self._obs), )
[docs]class CalibrationMap(object): """ a helper object that represents 2 FixedBandwidth1DKDE, one for cleans and one for glitches builds and manages these for you based on input samples can then compute things like likelihood ratios, efficiency, fap, etc. Can even generate a full ROC curve automatically. """ def __init__( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, num_points=DEFAULT_NUM_POINTS, gch_num_points=DEFAULT_NUM_POINTS, gch_b=DEFAULT_B, cln_num_points=DEFAULT_NUM_POINTS, cln_b=DEFAULT_B, compute=True, # can be used to avoid computing loglike_cdf clean_segs=None, rate_estimation=DEFAULT_RATE_ESTIMATION, min_livetime=DEFAULT_MIN_LIVETIME, min_clean_livetime=DEFAULT_MIN_CLEAN_LIVETIME, model_id=None, **kwargs ): # record provenance stuff self._start = dataset.start self._end = dataset.end self._segs = segmentlist(dataset.segs) # make a copy self._model_id = model_id self._hash = None # rate estimation if rate_estimation == "livetime": assert ( clean_segs is not None ), "need to provide clean segments if using rate estimation via livetime" self.rate_estimation = rate_estimation self._clean_segs = clean_segs # health checks self.min_livetime = min_livetime self.min_clean_livetime = min_clean_livetime # make a local copy of datasets to record minimal sets of data self._gch_dataset, self._cln_dataset = dataset.vectors2classes() self.compute_rates() # set up KDE objects self._gch_kde = FixedBandwidth1DKDE( self._gch_dataset.ranks, num_points=gch_num_points, b=gch_b, **kwargs ) self._cln_kde = FixedBandwidth1DKDE( self._cln_dataset.ranks, num_points=cln_num_points, b=cln_b, **kwargs ) self._ranks = np.linspace(0, 1, num_points) # make sure that we have this stored in case we don't compute... self._num_quantiles2dq(num_quantiles) if compute: self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) @property def ranks(self): return self._ranks @property def start(self): return min(self._gch_dataset.start, self._cln_dataset.start) @start.setter def start(self, new): for dataset, kde in [ (self._gch_dataset, self._gch_kde), (self._cln_dataset, self._cln_kde), ]: dataset.start = new dataset.end = max(new, dataset.end) kde.remove_observations(dataset.filter().ranks) self.compute_rates() @property def end(self): return max(self._gch_dataset.end, self._cln_dataset.end) @end.setter def end(self, new): for dataset, kde in [ (self._gch_dataset, self._gch_kde), (self._cln_dataset, self._cln_kde), ]: dataset.start = min(new, dataset.end) dataset.end = new kde.remove_observations(dataset.filter().ranks) self.compute_rates() @property def segs(self): return self._gch_dataset.segs | self._cln_dataset.segs @segs.setter def segs(self, segs): if len(segs): self.start = min(self.start, segs[0][0]) self.end = max(self.end, segs[-1][1]) @property def hash(self): """the identifier used to locate this model.""" if self._hash is not None: return self._hash else: return names.times_id2hash(self._start, self._end, self._model_id) @hash.setter def hash(self, new): self._hash = new @property def model_id(self): return self._model_id def add_gch( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, clean_segs=None, **auto_optimize_kwargs ): if self.rate_estimation == "livetime" or self._clean_segs is not None: assert clean_segs is not None, ( "need to provide clean segments if rate_estimation = livetime or " "instantiated a calibration map with clean segments" ) self._clean_segs |= clean_segs self._gch_dataset += dataset self._gch_kde.add_observations(dataset.ranks) self.auto_optimize_gch(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def add_cln( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, clean_segs=None, **auto_optimize_kwargs ): if self.rate_estimation == "livetime" or self._clean_segs is not None: assert clean_segs is not None, ( "need to provide clean segments if rate_estimation = livetime or " "instantiated a calibration map with clean segments" ) self._clean_segs |= clean_segs self._cln_dataset += dataset self._cln_kde.add_observations(dataset.ranks) self.auto_optimize_cln(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def add( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, clean_segs=None, **auto_optimize_kwargs ): if self.rate_estimation == "livetime" or self._clean_segs is not None: assert clean_segs is not None, ( "need to provide clean segments if rate_estimation = livetime or " "instantiated a calibration map with clean segments" ) self._clean_segs |= clean_segs gch, cln = dataset.vectors2classes() self._gch_dataset += gch self._cln_dataset += cln self._gch_kde.add_observations(gch.ranks) self._cln_kde.add_observations(cln.ranks) self.auto_optimize(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def remove_gch( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, **auto_optimize_kwargs ): self._gch_dataset -= dataset self._gch_kde.remove_observations(dataset.ranks) if self._clean_segs: # enforce clean segments span dataset self._clean_segs &= self.segs self.auto_optimize_gch(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def remove_cln( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, **auto_optimize_kwargs ): self._cln_dataset -= dataset self._cln_kde.remove_observations(dataset.ranks) if self._clean_segs: # enforce clean segments span dataset self._clean_segs &= self.segs self.auto_optimize_cln(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def remove( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, **auto_optimize_kwargs ): gch, cln = dataset.vectors2classes() self._gch_dataset -= dataset self._cln_dataset -= dataset self._gch_kde.remove_observations(gch.ranks) self._cln_kde.remove_observations(cln.ranks) if self._clean_segs: # enforce clean segments span dataset self._clean_segs &= self.segs self.auto_optimize(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def flush_gch( self, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, **auto_optimize_kwargs ): self._gch_kde.remove_observations( self._gch_dataset.flush( max_stride=max_stride, max_samples=max_samples ).ranks ) if self._clean_segs: # enforce clean segments span dataset self._clean_segs &= self.segs self.auto_optimize_gch(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def flush_cln( self, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, **auto_optimize_kwargs ): self._cln_kde.remove_observations( self._cln_dataset.flush( max_stride=max_stride, max_samples=max_samples ).ranks ) if self._clean_segs: # enforce clean segments span dataset self._clean_segs &= self.segs self.auto_optimize_cln(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def flush( self, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, **auto_optimize_kwargs ): self._gch_kde.remove_observations( self._gch_dataset.flush( max_stride=max_stride, max_samples=max_samples ).ranks ) self._cln_kde.remove_observations( self._cln_dataset.flush( max_stride=max_stride, max_samples=max_samples ).ranks ) if self._clean_segs: # enforce clean segments span dataset self._clean_segs &= self.segs self.auto_optimize(**auto_optimize_kwargs) self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) self.compute_rates() def add_and_flush_gch( self, dataset, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, clean_segs=None, **auto_optimize_kwargs ): self._gch_dataset += dataset self._gch_kde.add_observations(dataset.ranks) if self.rate_estimation == "livetime" or self._clean_segs is not None: assert clean_segs is not None, ( "need to provide clean segments if rate_estimation = livetime or " "instantiated a calibration map with clean segments" ) self._clean_segs |= clean_segs self.flush_gch( max_stride=max_stride, max_samples=max_samples, num_quantiles=num_quantiles, num_mc=num_mc, **auto_optimize_kwargs ) def add_and_flush_cln( self, dataset, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, clean_segs=None, **auto_optimize_kwargs ): self._cln_dataset += dataset self._cln_kde.add_observations(dataset.ranks) if self.rate_estimation == "livetime" or self._clean_segs is not None: assert clean_segs is not None, ( "need to provide clean segments if rate_estimation = livetime or " "instantiated a calibration map with clean segments" ) self._clean_segs |= clean_segs self.flush_cln( max_stride=max_stride, max_samples=max_samples, num_quantiles=num_quantiles, num_mc=num_mc, **auto_optimize_kwargs ) def add_and_flush( self, dataset, max_stride=DEFAULT_MAX_STRIDE, max_samples=DEFAULT_MAX_SAMPLES, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, clean_segs=None, **auto_optimize_kwargs ): gch, cln = dataset.vectors2classes() self._gch_dataset += gch self._cln_dataset += cln self._gch_kde.add_observations(gch.ranks) self._cln_kde.add_observations(cln.ranks) if self.rate_estimation == "livetime" or self._clean_segs is not None: assert clean_segs is not None, ( "need to provide clean segments if rate_estimation = livetime or " "instantiated a calibration map with clean segments" ) self._clean_segs |= clean_segs self.flush( max_stride=max_stride, max_samples=max_samples, num_quantiles=num_quantiles, num_mc=num_mc, **auto_optimize_kwargs )
[docs] def compute_rates(self): """compute the approximate rate of gch and clean based on datasets""" if self.rate_estimation == "samples": # an incredibly simple approximation... self._gch_rate = ( 1.0 * len(self._gch_dataset) / utils.livetime(self._gch_dataset.segs) ) self._cln_rate = ( 1.0 * len(self._cln_dataset) / utils.livetime(self._cln_dataset.segs) ) elif self.rate_estimation == "livetime": T = utils.livetime(self.segs) Tc = utils.livetime(self._clean_segs) self._cln_rate = 1.0 * (Tc / T) self._gch_rate = 1.0 - self._cln_rate else: raise NotImplementedError( "rate estimation {} not implemented".format(self.rate_estimation) )
def _num_quantiles2dq(self, num_quantiles): # we use num_quantiles+2 interpolation points (see compute_loglike_cdf) # so we need to divide the unit interval into num_quantiles+1 "bins" self._dq = 1.0 / (num_quantiles + 1)
[docs] def compute_loglike_cdf( self, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC ): """ compute CDF(loglike|rank) interpolation object via monte carlo """ self._num_quantiles2dq(num_quantiles) self._interp_loglike_cdf = np.empty( (len(self._ranks), num_quantiles + 2), dtype="float" ) self._interp_loglike_cdf[:, 0] = -np.infty # set end points self._interp_loglike_cdf[:, -1] = +np.infty # used within the iteration to avoid repeatedly defining this percentiles = np.linspace(0.0, 100, num_quantiles + 2)[1:-1] alpha_g = self._gch_kde.pdf_alpha(self._ranks) beta_g = self._gch_kde.pdf_beta(self._ranks) scale_g = self._gch_kde._pdf_scale alpha_c = self._cln_kde.pdf_alpha(self._ranks) beta_c = self._cln_kde.pdf_beta(self._ranks) scale_c = self._cln_kde._pdf_scale dscale = np.log(scale_g) - np.log(scale_c) truth = np.empty(num_mc, dtype=bool) samples = np.empty(num_mc, dtype=float) # re-use these within loop to avoid re-allocating memory gsamples = np.empty(num_mc, dtype=float) csamples = np.empty(num_mc, dtype=float) for i, (_, ag, bg, ac, bc) in enumerate( zip(self._ranks, alpha_g, beta_g, alpha_c, beta_c) ): gsamples[:] = beta.rvs(ag, bg, size=num_mc) csamples[:] = beta.rvs(ac, bc, size=num_mc) # catch all possible log(0) to avoid warnings # only gsamples are zero truth[:] = gsamples == 0 samples[truth] = -np.infty # only csamples are zero truth[:] = csamples == 0 samples[truth] = +np.infty # both pdfs are zero truth[:] = (gsamples == 0) * (csamples == 0) samples[truth] = 0.0 # both pdfs are non-zero truth[:] = (gsamples != 0) * (csamples != 0) samples[truth] = np.log(gsamples[truth]) - np.log(csamples[truth]) + dscale # the quantiles associated with monte carlo self._interp_loglike_cdf[i, 1:-1] = [ np.percentile(samples, p) for p in percentiles ]
def _auto_optimize( self, kde, dloglike=DEFAULT_DLOGLIKE, minb=DEFAULT_MINB, maxb=DEFAULT_MAXB, tol=DEFAULT_TOL, **kwargs ): """ determines whether loglike is likely to change enough to warrante re-optimization. Specifically, we check whether if dloglike < |(dloglike/db)|*(b*tol) = (|dloglike/dlogb|*(1/b) * (b*tol)) = |kde.grad(kde.b)|*tol then re-optimize the logic being that if the uncertainty in the optimal b due to just the tolerance is small enough, we shouldn't bother re-optimizing """ grad = kde.grad(kde.b) if dloglike < np.abs(grad) * tol: # run optimization algorithm if grad > 0: kde.optimize(minb=max(minb, kde.b), maxb=maxb, tol=tol, **kwargs) else: kde.optimize(minb=minb, maxb=min(kde.b, maxb), tol=tol, **kwargs) def auto_optimize_gch(self, **kwargs): self._auto_optimize(self._gch_kde, **kwargs) def auto_optimize_cln(self, **kwargs): self._auto_optimize(self._cln_kde, **kwargs)
[docs] def auto_optimize(self, **kwargs): """ delegation to individual functions for gch and cln """ glitch_b_factor = kwargs.pop("glitch_b_factor", DEFAULT_GLITCH_B) clean_b_factor = kwargs.pop("clean_b_factor", DEFAULT_CLEAN_B) self.auto_optimize_gch(b_factor=glitch_b_factor, **kwargs) self.auto_optimize_cln(b_factor=clean_b_factor, **kwargs)
def optimize_gch(self, **kwargs): self._gch_kde.optimize(**kwargs) def optimize_cln(self, **kwargs): self._cln_kde.optimize(**kwargs)
[docs] def optimize(self, **kwargs): """ delegation to individual functions for gch and cln """ glitch_b_factor = kwargs.pop("glitch_b_factor", DEFAULT_GLITCH_B) clean_b_factor = kwargs.pop("clean_b_factor", DEFAULT_CLEAN_B) self.optimize_gch(b_factor=glitch_b_factor, **kwargs) self.optimize_cln(b_factor=clean_b_factor, **kwargs)
def gch_logpdf(self, ranks): return self._gch_kde.logpdf(ranks) def gch_logpdf_quantile(self, ranks, q): return self._gch_kde.logpdf_quantile(ranks, q) def gch_pdf(self, ranks): return self._gch_kde.pdf(ranks) def gch_pdf_quantile(self, ranks, q): return self._gch_kde.pdf_quantile(ranks, q) def gch_cdf(self, ranks): return self._gch_kde.cdf(ranks) def gch_cdf_quantile(self, ranks, q): return self._gch_kde.cdf_quantile(ranks, q) def cln_logpdf(self, ranks): return self._cln_kde.logpdf(ranks) def cln_logpdf_quantile(self, ranks, q): return self._cln_kde.logpdf_quantile(ranks, q) def cln_pdf(self, ranks): return self._cln_kde.pdf(ranks) def cln_pdf_quantile(self, ranks, q): return self._cln_kde.pdf_quantile(ranks, q) def cln_cdf(self, ranks): return self._cln_kde.cdf(ranks) def cln_cdf_quantile(self, ranks, q): return self._cln_kde.cdf_quantile(ranks, q) def eff(self, ranks): # gch_cdf integrates from small ranks to big ones, we want the opposite return 1.0 - self.gch_cdf(ranks) def eff_quantile2rank(self, e): return self._gch_kde.quantile(1.0 - e) def fap(self, ranks): return 1.0 - self.cln_cdf(ranks) def fap_quantile2rank(self, f): return self._cln_kde.quantile(1.0 - f) @property def prior_gch(self): """an estimate of the prior that any given sample is a glitch, based on the approximate rate derived from self._gch_dataset""" return self._gch_rate / (self._gch_rate + self._cln_rate) @property def prior_cln(self): """an estimate of the prior that any given sample is a clean, based on the approximate rate derived from self._cln_dataset""" return 1.0 - self.prior_gch # by definition @property def prior_odds(self): """the prior odds that a sample is a glitch vs a clean (self.prior_gch/self.prior_cln)""" return self._gch_rate / self._cln_rate
[docs] def pglitch(self, ranks): """ compute the p(glitch) at this value of b pglitch = 1 / ( 1 + (np.exp(-loglike(b)/prior_odds) ) NOTE: this includes the prior_odds based on approximate rates of glitches and cleans. If that is not desired, users can call loglike directly and apply their own prior_odds """ return 1.0 / (1.0 + np.exp(-self.loglike(ranks)) / self.prior_odds)
[docs] def loglike(self, ranks): """ NOTE: returns log(E[p(r|g)]) - log(E[p(r|c)]) """ return self._gch_kde.logpdf(ranks) - self._cln_kde.logpdf(ranks)
[docs] def loglike_mean(self, ranks): """ NOTE: returns E[log(p(r|g)) - log(p(r|c))] """ return self._gch_kde.logpdf_mean(ranks) - self._cln_kde.logpdf_mean(ranks)
def loglike_std(self, ranks): return self.loglike_var(ranks) ** 0.5 def loglike_var(self, ranks): return self._gch_kde.logpdf_var(ranks) + self._gch_kde.logpdf_var(ranks)
[docs] def loglike_quantile(self, ranks, q): """ interpolates between requested quantile and stored quantiles first, then interpolates over the resulting function of rank to get the values at the requested ranks makes use of self._ranks and self._interp_loglike_cdf to do this, with self._interp_loglike_cdf being computed via a monte-carlo integration as part of self.compute() NOTE: this function assumes "q" is a float, not an array """ # find indices in self._loglike_cdf corresponding to quantile q # note, this may give issues near the boundaries of our interpolation # objects where I've by-hand set quantiles to -infty, +infty i = int(q / self._dq) j = i + 1 # pull out relevant points from interpolation object lower = self._interp_loglike_cdf[:, i] upper = self._interp_loglike_cdf[:, j] # interpolate between neighboring estimates of CDF(rank) to find # quanitles at self._ranks interp_q = np.array([i, j]) * self._dq interp_quantiles = [ np.interp(q, interp_q, [lower[rankind], upper[rankind]]) for rankind in range(len(self._ranks)) ] # interpolate between self._ranks to find the values at requested ranks return np.interp(ranks, self._ranks, interp_quantiles)
[docs] def roc(self, ranks=None): """ returns fap(ranks), eff(ranks) """ if ranks is None: ranks = self.ranks return self.fap(ranks), self.eff(ranks)
[docs] def is_healthy(self): """ Determine the health of the pipeline. Based on calculation of dfap/fap given in map. Similar results shown in calibration accuracy plot. """ if not utils.livetime(self.segs) >= self.min_livetime: return False if not utils.livetime(self._clean_segs) >= self.min_clean_livetime: return False ranks = self._cln_dataset.ranks fap = self.fap(ranks) # calculate nominative and fractional FAP vals, counts = np.unique(fap, return_counts=True) nom, frc = np.transpose( np.array([(val, count) for val, count in zip(vals, counts)], dtype=float) ) order = nom.argsort() nom = nom[order] frc = np.cumsum(frc[order]) / len(ranks) dfap = np.abs(((nom - frc) / frc) * 100) return not np.any(dfap[nom > 1e-3] > 50)
# -------------------------------------------------
[docs]class DiscreteCalibrationMap(CalibrationMap): """ a helper object that represents 2 FixedBandwidth1DKDE, one for cleans and one for glitches builds and manages these for you based on input samples can then compute things like likelihood ratios, efficiency, fap, etc. Can even generate a full ROC curve automatically. """ def __init__( self, dataset, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC, compute=True, # can be used to avoid computing loglike_cdf clean_segs=None, rate_estimation=DEFAULT_RATE_ESTIMATION, min_livetime=DEFAULT_MIN_LIVETIME, min_clean_livetime=DEFAULT_MIN_CLEAN_LIVETIME, model_id=None, **kwargs ): # record provenance stuff self._start = dataset.start self._end = dataset.end self._segs = segmentlist(dataset.segs) # make a copy self._model_id = model_id # rate estimation if rate_estimation == "livetime": assert ( clean_segs is not None ), "need to provide clean segments if using rate estimation via livetime" self.rate_estimation = rate_estimation self._clean_segs = clean_segs # health checks self.min_livetime = min_livetime self.min_clean_livetime = min_clean_livetime # make a local copy of datasets to record minimal sets of data self._gch_dataset, self._cln_dataset = dataset.vectors2classes() self.compute_rates() # set up KDE objects self._gch_kde = Discrete1DKDE(self._gch_dataset.ranks, **kwargs) self._cln_kde = Discrete1DKDE(self._cln_dataset.ranks, **kwargs) # make sure that we have this stored in case we don't compute... self._num_quantiles2dq(num_quantiles) if compute: self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc) @property def ranks(self): return sorted( set(list(self._gch_kde._count.keys()) + list(self._cln_kde._count.keys())) ) def _auto_optimize(self, *args, **kwargs): # don't do anything since there is nothing to optimize for the discrete map pass
[docs] def loglike(self, ranks): """ NOTE: returns log(E[p(r|g)]) - log(E[p(r|c)]) """ gch = self._gch_kde.logpdf(ranks) cln = self._cln_kde.logpdf(ranks) # do this to avoid subtracting -inf from -inf ans = np.zeros_like(gch, dtype=float) truth = np.logical_not((gch == -np.infty) * (cln == -np.infty)) ans[truth] = gch[truth] - cln[truth] return ans
[docs] def compute_loglike_cdf( self, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC ): """ compute CDF(loglike|rank) interpolation object via monte carlo """ self._num_quantiles2dq(num_quantiles) ranks = self.ranks self._loglike_cdf = dict( (rank, np.empty(num_quantiles + 2, dtype=float)) for rank in ranks ) for rank in ranks: self._loglike_cdf[rank][0] = -np.infty # set end points self._loglike_cdf[rank][-1] = +np.infty # used within the iteration to avoid repeatedly defining this percentiles = np.linspace(0.0, 100, num_quantiles + 2)[1:-1] alpha_g, beta_g = np.transpose(self._gch_kde._pdf_params(ranks)) alpha_c, beta_c = np.transpose(self._cln_kde._pdf_params(ranks)) truth = np.empty(num_mc, dtype=bool) samples = np.empty(num_mc, dtype=float) gsamples = np.empty( num_mc, dtype=float ) # re-use these within loop to avoid re-allocating memory csamples = np.empty(num_mc, dtype=float) for rank, ag, bg, ac, bc in zip(ranks, alpha_g, beta_g, alpha_c, beta_c): gsamples[:] = beta.rvs(ag, bg, size=num_mc) csamples[:] = beta.rvs(ac, bc, size=num_mc) # catch all possible log(0) to avoid warnings # only gsamples are zero truth[:] = gsamples == 0 samples[truth] = -np.infty # only csamples are zero truth[:] = csamples == 0 samples[truth] = +np.infty # both pdfs are zero truth[:] = (gsamples == 0) * (csamples == 0) samples[truth] = 0.0 # both pdfs are non-zero truth[:] = (gsamples != 0) * (csamples != 0) samples[truth] = np.log(gsamples[truth]) - np.log(csamples[truth]) # the quantiles associated with monte carlo self._loglike_cdf[rank][1:-1] = [ np.percentile(samples, p) for p in percentiles ]
[docs] def loglike_quantile(self, ranks, q): """ interpolates between requested quantile and stored quantiles first, then interpolates over the resulting function of rank to get the values at the requested ranks makes use of self._ranks and self._interp_loglike_cdf to do this, with self._interp_loglike_cdf being computed via a monte-carlo integration as part of self.compute() NOTE: this function assumes "q" is a float, not an array """ qs = np.arange(0, 1 + self._dq, self._dq) return np.array( [ np.interp(q, qs, self._interp_cdf[ranks]) if ranks in self._interp_cdf else 0 ], dtype=float, )
[docs]class Discrete1DKDE(object): """ an object similar to FixedBandwidth1DKDE but assuming ranks only occur at discrete values instead of being drawn from a continuous distribution """ def __init__(self, obs, **kwargs): self._obs = np.array([], dtype=float) self._count = defaultdict(int) self.add_observations(obs) # sets up cumulative counter as well @property def obs(self): return self._obs @property def ranks(self): return sorted(self._count.keys()) def optimize(self, **kwargs): pass # do nothing here because there is nothing to optimize def add_observations(self, obs): for rank in obs: self._count[rank] += 1 self._compute_cum_count() self._obs = np.concatenate((self._obs, obs)) def remove_observations(self, obs): o = list(self.obs) for rank in obs: if self._obs[rank] == 0: raise ValueError( "cannot remove rank=%.3e because it is not contained in KDE!" % rank ) self._obs[rank] -= 1 o.remove(rank) self._compute_cum_count() self._obs = np.array(o) def _compute_cum_count(self): self._cum_count = [] c = 0 for rank in sorted(self._count.keys()): c += self._count[rank] self._cum_count.append((rank, c)) self._cum_count = np.array(self._cum_count, dtype=float)
[docs] def flush(self, max_samples=DEFAULT_MAX_SAMPLES): """ remove the observations from the front of the list until the total number of observations is <=max_samples. """ N = len(self.obs) if max_samples == 0: # don't bother to re-compute anything, just zero everything out self._obs = np.array([], dtype=float) self._count = defaultdict(int) elif max_samples < N: self.remove_observations(self.obs[: N - max_samples]) else: pass
def _params(self, count, N): if N: return np.array([(c + 1, N - c + 1) for c in count], dtype=float) else: # uniform distribution return np.array([(1, 1) for c in count], dtype=float) def _cdf_params(self, ranks): N = len(self._obs) return self._params(self.cdf(ranks) * N, N) def _pdf_params(self, ranks): N = len(self._obs) return self._params(self.pdf(ranks) * N, N) def cdf(self, ranks): if len(self._obs): ans = [] for rank in ranks: ind = self._cum_count[:, 0] <= rank if np.any(ind): ans.append(self._cum_count[ind, 1][-1]) # the biggest rank else: ans.append(0) return np.array(ans, dtype=float) / len(self._obs) # normalize the counts else: # assume a uniform distribution, so the CDF is just how much we've # integrated, which is the rank return np.array(ranks, dtype=float) def cdf_std(self, ranks): return self.cdf_var(ranks) ** 0.5 def cdf_var(self, ranks): c = self.cdf(ranks) N = len(self._obs) return c * (1 - c) / N def cdf_quantile(self, ranks, q): return np.array( [beta.ppf(q, a, b) for a, b in self._cdf_params(ranks)], dtype=float ) def quantile(self, q): return np.interp( q, self._cum_count[:, 1] / len(self._obs), self._cum_count[:, 0] ) def pdf(self, ranks): if len(self._obs): return np.array( [self._count.get(rank, 0) for rank in ranks], dtype=float ) / len(self._obs) else: # just say that there is vanishing probability associated with any # indiv rank without any observations return np.ones_like(ranks, dtype=float) * 0.5 def pdf_std(self, ranks): return self.pdf_var(ranks) ** 0.5 def pdf_var(self, ranks): p = self.pdf(ranks) N = len(self._obs) return p * (1 - p) / N def pdf_quantile(self, ranks, q): return np.array( [beta.ppf(q, a, b) for a, b in self._pdf_params(ranks)], dtype=float ) def logpdf(self, ranks): ans = -np.infty * np.ones_like(ranks, dtype=float) pdf = self.pdf(ranks) truth = pdf > 0 ans[truth] = np.log(pdf[truth]) return ans def logpdf_mean(self, ranks): return np.array( [digamma(a) - digamma(a + b) for a, b in self._pdf_params(ranks)] ) def logpdf_std(self, ranks): return self.logpdf_var(ranks) ** 0.5 def logpdf_var(self, ranks): return np.array( [polygamma(1, a) - polygamma(1, a + b) for a, b in self._pdf_params(ranks)] ) def logpdf_quantile(self, ranks, q): return np.log(self.pdf_quantile(ranks, q))
[docs] def coverage(self): """return ranks, CDF(ranks), fraction of ranks with CDF(rank) <= X""" ranks = self.ranks return ( ranks, self.cdf(ranks), np.array([np.sum(self._obs <= r) for r in ranks], dtype=float) / len(self._obs), )