import math
import numpy as np
from scipy.special import erf
from scipy.special import digamma
from scipy.special import polygamma
from scipy.stats import beta
from collections import defaultdict
from ligo.segments import segmentlist
from . import names
from . import utils
LOG3 = np.log(3)
DEFAULT_NUM_MC = 10000
DEFAULT_NUM_QUANTILES = 101
DEFAULT_NUM_POINTS = 101
DEFAULT_MAX_SAMPLES = np.infty # use within flush logic
DEFAULT_MAX_STRIDE = np.infty
DEFAULT_B = 0.1
DEFAULT_MINB = 1e-4
DEFAULT_MAXB = 1e0
DEFAULT_TOL = 1e-4
DEFAULT_DLOGLIKE = 1
DEFAULT_GLITCH_B = 1 / 24.0
DEFAULT_CLEAN_B = 1 / 12.0
DEFAULT_RATE_ESTIMATION = "livetime"
DEFAULT_MIN_LIVETIME = 86400 # 1 day
DEFAULT_MIN_CLEAN_LIVETIME = 43200 # half a day
[docs]class FixedBandwidth1DKDE(object):
"""
an object that represents the calibration mapping between
SupervisedClassifier output (ranks in [0, 1]) and probabilistic statements
essentially a fancy interpolation device
SupervisedClassifier and IncrementalSupervisedClassifier objects retain a
reference to one of these functionality should be universal for all
classifiers!
We accomplish this with a standard fixed-bandwidth Gaussian KDE, represented
internally as a vector that's referenced via interpolation for rapid
execution.
"""
def __init__(
self, observations, num_points=DEFAULT_NUM_POINTS, b=DEFAULT_B, **kwargs
):
# set up basic references to data and hyperparameters
self._obs = np.array([], dtype=float)
self.b = b
self._num_points = num_points
# set up data structures used when computing KDE fitting parameters
self._interp_x = np.linspace(0, 1, self._num_points)
self._interp_logpdf = np.empty(self._num_points, dtype="float")
self._interp_cdf = np.empty(self._num_points, dtype="float")
# first and second moments of the pdf, which we calculate in the log and
# do some backflips to maintain high precision
self._interp_logpdf_m1 = np.empty((self._num_points, 4), dtype="float")
self._interp_logpdf_m2 = np.empty((self._num_points, 2), dtype="float")
# uniform distributions
self._interp_pdf_alpha = np.ones(self._num_points, dtype="float")
self._interp_pdf_beta = np.ones(self._num_points, dtype="float")
# first and second moments for the CDF
self._interp_cdf_m1 = np.empty((self._num_points, 2), dtype="float")
self._interp_cdf_m2 = np.empty((self._num_points, 2), dtype="float")
# uniform distributions
self._interp_cdf_alpha = np.ones(self._num_points, dtype="float")
self._interp_cdf_beta = np.ones(self._num_points, dtype="float")
# the normalization of the whole thing
self._count = 0.0
# set all values to be the default (no data present)
self.flush(max_samples=0)
# now add the observations
self.add_observations(observations)
@property
def b(self):
return self._b
@b.setter
def b(self, b):
self._b = b
self._ib2 = self.b**-2
self._ibsqrt2 = 1.0 / (2**0.5 * self.b)
self._i2b2 = 0.5 * self._ib2
@property
def obs(self):
return self._obs
@property
def ranks(self):
return self._interp_x
@property
def interp_x(self):
return self._interp_x
@property
def interp_logpdf(self):
return self._interp_logpdf
@property
def interp_cdf(self):
return self._interp_cdf
@property
def interp_pdf_alpha(self):
return self._interp_pdf_alpha
@property
def interp_pdf_beta(self):
return self._interp_pdf_beta
@property
def interp_cdf_alpha(self):
return self._interp_cdf_alpha
@property
def interp_cdf_beta(self):
return self._interp_cdf_beta
[docs] def compute(self):
"""
this works just like update, except it zero's all the starting arrays
and then delegates to update handing it all the current observations
"""
obs = self.obs
# re-set everything
self.flush(max_samples=0)
# add the observations back in, which triggers updates to internal KDE
# representations
self.add_observations(obs)
[docs] def flush(self, max_samples=DEFAULT_MAX_SAMPLES):
"""
remove the observations from the front of the list until the total
number of observations is <=max_samples.
"""
N = len(self.obs)
if max_samples == 0:
# don't bother re-computing contributions and subtracting, just zero
# everything out, forget about all existing observations
self._obs = np.array([], dtype=float)
# interpolation stuff
self._interp_logpdf[:] = -np.infty
self._interp_cdf[:] = 0.0
# first and second moments of the pdf, which we calculate in the log
# and do some backflips to maintain high precision
self._interp_logpdf_m1[:, 0] = -np.infty
self._interp_logpdf_m2[:, 0] = -np.infty
# first and second moments for the CDF
self._interp_cdf_m1[:, 0] = 0.0
self._interp_cdf_m2[:, 0] = 0.0
# the normalization of the whole thing
self._count = 0.0
elif max_samples < N:
# keep only a fraction of things, so strip out the first few
# observations
self.remove_observations(self.obs[: N - max_samples])
else:
# keep more obsevations than we have, so we don't do anything
pass
def _rank2delta(self, rank):
"""
fill in the internal data structures with the contributions from just
this one rank
"""
x0 = self._interp_x - rank
x1 = self._interp_x + rank
x2 = self._interp_x - 2 + rank
z0 = -(x0**2)
z1 = -(x1**2)
z2 = -(x2**2)
# compute pdf 1st moment
self._interp_logpdf_m1[:, 1] = self._i2b2 * z0
self._interp_logpdf_m1[:, 2] = self._i2b2 * z1
self._interp_logpdf_m1[:, 3] = self._i2b2 * z2
m = np.max(self._interp_logpdf_m1[:, 1:], axis=1)
# NOTE, we divide pdf by 3 so that the thing we fit with a
# beta-distribution is bound between 0 and 1.
self._interp_logpdf_m1[:, 1] = (
np.log(
np.sum(np.exp(self._interp_logpdf_m1[:, 1:].transpose() - m), axis=0)
)
+ m
- LOG3
)
# compute 2nd moment for pdf error estimates
self._interp_logpdf_m2[:, 1] = self._interp_logpdf_m1[:, 1] * 2
# compute 2nd momemt for cdf error estimates
erf0 = erf(-rank * self._ibsqrt2)
erf1 = erf(rank * self._ibsqrt2)
erf2 = erf((-2 + rank) * self._ibsqrt2)
self._interp_cdf_m1[:, 1] = 0.5 * (
erf(x0 * self._ibsqrt2)
- erf0
+ erf(x1 * self._ibsqrt2)
- erf1
+ erf(x2 * self._ibsqrt2)
- erf2
)
self._interp_cdf_m2[:, 1] = self._interp_cdf_m1[:, 1] ** 2
dcount = 0.5 * (
erf((1 - rank) * self._ibsqrt2)
- erf0
+ erf((1 + rank) * self._ibsqrt2)
- erf1
+ erf((-1 + rank) * self._ibsqrt2)
- erf2
)
return dcount
[docs] def remove_observations(self, observations):
"""
works just like update, except we subtract out the contributions of the
observations instead of adding them in
"""
if isinstance(observations, (int, float)):
observations = np.array([observations])
# remove observations
# we check that we can remove all observations before we update any
# internal structures!
obs = list(self.obs)
for rank in observations:
try:
obs.remove(rank) # removes the first occurance of this value
except ValueError:
raise ValueError(
"cannot remove rank=%.3e because it is not contained in KDE" % rank
)
self._obs = np.array(obs)
# iterate and remove contributions
truth = np.empty(self._num_points, dtype=bool)
for rank in observations:
# compute the changes associated with just this one observation
dcount = self._rank2delta(rank)
# update 1st moment for pdf error
self._interp_logpdf_m1[:, 1] -= self._interp_logpdf_m1[:, 0]
truth[:] = self._interp_logpdf_m1[:, 1] > 0
self._interp_logpdf_m1[truth, 1] = 0
self._interp_logpdf_m1[:, 0] += np.log(
1 - np.exp(self._interp_logpdf_m1[:, 1])
)
# update 2nd moment for pdf error estimates
self._interp_logpdf_m2[:, 1] -= self._interp_logpdf_m2[:, 0]
truth[:] = self._interp_logpdf_m2[:, 1] > 0
self._interp_logpdf_m2[truth, 1] = 0
self._interp_logpdf_m2[:, 0] += np.log(
1 - np.exp(self._interp_logpdf_m2[:, 1])
)
# update momemts for cdf error estimates
self._interp_cdf_m1[:, 0] -= self._interp_cdf_m1[:, 1]
self._interp_cdf_m2[:, 0] -= self._interp_cdf_m2[:, 1]
self._count -= dcount
# update fitting parameters
self._compute_params()
[docs] def add_observations(self, observations):
"""
update the local data structures using the new observations compute the
interpolation arrays used for fast execution later this includes error
estimates, which we compute by fitting beta distributions to the first 2
moments of the pdf and cdf, respectively
"""
if isinstance(observations, (int, float)):
observations = np.array([observations])
self._obs = np.concatenate((self._obs, observations))
# iterate and add contributions
for rank in observations:
# compute the changes associated with just this one observation
dcount = self._rank2delta(rank)
# update 1st moment for pdf error
m = np.max(self._interp_logpdf_m1[:, :2], axis=1)
self._interp_logpdf_m1[:, 0] = (
np.log(
np.sum(
np.exp(self._interp_logpdf_m1[:, :2].transpose() - m), axis=0
)
)
+ m
)
# update 2nd moment for pdf error estimates
m = np.max(self._interp_logpdf_m2, axis=1)
self._interp_logpdf_m2[:, 0] = (
np.log(np.sum(np.exp(self._interp_logpdf_m2.transpose() - m), axis=0))
+ m
)
# update momemts for cdf error estimates
self._interp_cdf_m1[:, 0] += self._interp_cdf_m1[:, 1]
self._interp_cdf_m2[:, 0] += self._interp_cdf_m2[:, 1]
self._count += dcount
# update fitting parameters
self._compute_params()
def _compute_params(self):
"""
map the internal references for 1st and 2nd moments into interpolation
parameters for beta distribution NOTE: if the distributions don't really
fit withih rank in [0,1], doing this may bias our error estimates!
"""
if self._count: # compute parameters for error estimates
# NOTE: for a beta distribution described by 2 parameters: alpha, beta
# pdf(x|alpha, beta) = Gamma(alpha+beta)/(Gamma(alpha)*Gamma(beta))
# * x**(alpha-1) * (1-x)**(beta-1)
# E[x] = alpha/(alpha+beta)
# Var[x] = alpha*beta/((alpha+beta)**2 * (alpha+beta+1))
inum_obs = 1.0 / self._count
lognum_obs = np.log(self._count)
# pdf
# expected value
Ep = np.exp(self._interp_logpdf_m1[:, 0] - lognum_obs)
# variance
Vp = (
np.exp(self._interp_logpdf_m2[:, 0] - lognum_obs) - Ep**2
) * inum_obs
# handle numerical error explicitly by imposing a minimum variance
# assume samples are distributed according to a Gaussian with stdv
# ~0.1*self.b, which is much smaller than we can resolve minVp =
# 0.5*(0.1)**4 / num_obs
minVp = 5.0e-5 * inum_obs
Vp[Vp < minVp] = minVp # handle numerical error explicitly
self._interp_pdf_alpha, self._interp_pdf_beta = self._cumulants2params(
Ep, Vp
)
# cdf
# expected value
Ec = self._interp_cdf_m1[:, 0] * inum_obs
# variance
Vc = (self._interp_cdf_m2[:, 0] * inum_obs - Ec**2) * inum_obs
# handle numerical error explicitly by imposing a minimum variance
# NOTE: we use the same mimimum as Vp without much justification...
Vc[Vc < minVp] = minVp # handle numerical error explicitly
self._interp_cdf_alpha, self._interp_cdf_beta = self._cumulants2params(
Ec, Vc
)
# normalize the pdf, compute the cdf
# NOTE: if the distributions are entirely contained within rank in
# [0,1], these should be equivalent to Ep and Ec, respectively
# NOTE: we do NOT normalize pdfs so they integrate to one with rank
# in [0, 1]. USER BEWARE!
self._interp_logpdf[:] = (
self._interp_logpdf_m1[:, 0]
- np.log(self._count)
- 0.5 * np.log(2 * np.pi * self.b**2)
+ LOG3
)
self._interp_cdf[:] = Ec
# handle numerical stability issues explicitly
self._interp_cdf[self._interp_cdf < 0] = 0
self._interp_cdf[self._interp_cdf > 1] = 1
else: # no data, so set everything to a uniform distribution
self._interp_logpdf[:] = 0.5
self._interp_cdf[:] = np.linspace(0, 1, self._num_points)
self._interp_pdf_alpha[:] = 1 # uniform distributions
self._interp_pdf_beta[:] = 1
self._interp_cdf_alpha[:] = 1 # uniform distributions
# NOTE: this may be wonky when we go from no data to some data, but
# that shouldn't be the main use case
self._interp_cdf_beta[:] = 1
def _cumulants2params(self, E, V, safe=False):
"""
compute the beta-distribution parameters corresponding to this expected
value and variance
"""
# now actually compute things
x = (1.0 - E) * E - V
alpha = (E / V) * x
beta = ((1.0 - E) / V) * x
bad = alpha <= 0
if np.any(bad):
if safe:
raise Warning("alpha<=0")
# FIXME, this hard-coding could be bad, but we're talking about when
# my model breaks down anyway...
alpha[bad] = 1e-8
beta[bad] = 1e-2
bad = beta <= 0
if np.any(bad):
if safe:
raise Warning("beta<=0")
# FIXME, this hard-coding could be bad, but we're talking about when
# my model breaks down anyway...
beta[bad] = 1e-8
alpha[bad] = 1e-2
return alpha, beta
[docs] def loglike(self, b):
r"""
compute the loglikelihood at this value of b
loglike = (1/N)\sum_i log( (1/(N-1)) \sum_{j \\neq i}
"""
if len(self.obs) < 2: # only one sample, so cross validation is ill defined
return 0.0
logl = 0.0
i2b2 = 0.5 * b**-2
truth = np.ones_like(self.obs, dtype=bool)
z = np.empty(len(truth) - 1, dtype=float)
for ind, rank in enumerate(self.obs):
truth[ind - 1] = True
truth[ind] = False
z[:] = -((rank - self.obs[truth]) ** 2) * i2b2
m = np.max(z)
logl += np.log(np.sum(np.exp(z - m))) + m
N = self._count
return (logl - np.log(N - 1) - 0.5 * np.log(2 * np.pi) - np.log(self.b)) / N
[docs] def grad(self, b):
r"""
computes dloglike/dlogb at b
dloglike/dlogb = (1/N) \sum_i \frac{sum_{j \neq i} ((x_i-x_j)**2/b**2 - 1)
* exp(-0.5*(x_i-x_j)**2/b**2)}{\sum_{j \neq i}
* exp(-0.5*(x_i-x_j)**2/b**2)}
"""
if len(self.obs) < 2: # cross validation is ill defined, so just return zero
return 0.0
dlogL = 0.0
ib2 = b**-2
truth = np.ones_like(self.obs, dtype=bool)
z = np.empty(len(truth) - 1, dtype=float)
weights = np.empty_like(z, dtype=float)
for ind, rank in enumerate(self.obs):
truth[ind - 1] = True
truth[ind] = False
z[:] = (rank - self.obs[truth]) ** 2 * ib2
weights[:] = -0.5 * z
m = np.max(weights)
weights[:] = np.exp(weights - m)
dlogL += np.sum(weights * (z - 1)) / np.sum(weights)
return dlogL / len(self.obs)
[docs] def optimize(
self,
minb=DEFAULT_MINB,
maxb=DEFAULT_MAXB,
tol=DEFAULT_TOL,
bounded=False,
b_factor=1 / 12,
**kwargs
):
"""
looks for the maximum of logL via Newton's method for the zeros of
dlogL/dlogb expects dlogL/dlogb to be monotonic in logb, which will
likely be true. However, if it is not then the logic in this loop may
fail.
"""
# set lower bound for bandwidth
if bounded:
N = len(self.obs)
minb = max(minb, (1.06 * b_factor) * math.pow(N, -0.2))
# check if we're already within tolerance
if maxb - minb <= minb * tol:
self.b = (minb * maxb) ** 0.5
else: # check if end points are optima
MdlogL = self.grad(maxb)
if MdlogL >= 0:
self.b = maxb
else:
mdlogL = self.grad(minb)
if mdlogL <= 0:
self.b = minb
else:
# iterate through bisection search until termination
# condition is reached
while maxb - minb > minb * tol:
midb = (minb * maxb) ** 0.5
dlogL = self.grad(midb)
if dlogL == 0:
self.b = midb
break
elif dlogL > 0:
minb = midb
else: # dlogL < 0
maxb = midb
else:
self.b = (minb * maxb) ** 0.5
# update internal data structures to reflect new bandwidth
self.compute()
return None
def cdf(self, ranks):
return np.interp(ranks, self._interp_x, self._interp_cdf)
[docs] def cdf_alpha(self, ranks):
"""
NOTE: because force the cdf to go through (0,0) and (1,1), the error
estimates are messed up near the end points because I can't represent
that with a beta function. This turns out to also affect the
second-to-last point as well because of how np.interp works (the nan
from the last point get mixed in)
"""
return np.interp(ranks, self._interp_x, self._interp_cdf_alpha)
[docs] def cdf_beta(self, ranks):
"""
NOTE: because force the cdf to go through (0,0) and (1,1), the error
estimates are messed up near the end points because I can't represent
that with a beta function. This turns out to also affect the
second-to-last point as well because of how np.interp works (the nan
from the last point get mixed in)
"""
return np.interp(ranks, self._interp_x, self._interp_cdf_beta)
def cdf_std(self, ranks):
return np.array(
[beta.std(self.cdf_alpha(rank), self.cdf_beta(rank)) for rank in ranks]
)
def cdf_var(self, ranks):
return np.array(
[beta.var(self.cdf_alpha(rank), self.cdf_beta(rank)) for rank in ranks]
)
[docs] def cdf_quantile(self, ranks, q):
"""
NOTE: because force the cdf to go through (0,0) and (1,1), the error
estimates are messed up near the end points because I can't represent
that with a beta function. This turns out to also affect the
second-to-last point as well because of how np.interp works (the nan
from the last point get mixed in)
"""
return np.array(
[beta.ppf(q, self.cdf_alpha(rank), self.cdf_beta(rank)) for rank in ranks]
)
def quantile(self, q):
return np.interp(q, self._interp_cdf, self._interp_x)
def pdf(self, ranks):
return np.exp(self.logpdf(ranks))
def pdf_alpha(self, ranks):
return np.interp(ranks, self._interp_x, self._interp_pdf_alpha)
def pdf_beta(self, ranks):
return np.interp(ranks, self._interp_x, self._interp_pdf_beta)
def pdf_std(self, ranks):
return np.array(
[beta.std(self.pdf_alpha(rank), self.pdf_beta(rank)) for rank in ranks]
)
def pdf_var(self, ranks):
return np.array(
[beta.var(self.pdf_alpha(rank), self.pdf_beta(rank)) for rank in ranks]
)
@property
def _pdf_scale(self):
return 3.0 / ((2 * np.pi) ** 0.5 * self.b)
def pdf_quantile(self, ranks, q):
scale = self._pdf_scale
return np.array(
[
beta.ppf(
q, self.pdf_alpha(rank), self.pdf_beta(rank), loc=0, scale=scale
)
for rank in ranks
]
)
[docs] def logpdf(self, ranks):
"""
NOTE: this returns log(E[pdf]), not E[log(pdf)]
"""
return np.interp(ranks, self._interp_x, self._interp_logpdf)
[docs] def logpdf_mean(self, ranks):
"""
NOTE: this returns E[log(pdf)] whereas logpdf returns log(E[pdf])
"""
scale = np.log(self._pdf_scale)
return (
np.array(
[
digamma(self.pdf_alpha(rank))
- digamma(self.pdf_alpha(rank) + self.pdf_beta(rank))
for rank in ranks
]
)
- scale
)
def logpdf_std(self, ranks):
return self.logpdf_var(ranks) ** 0.5
def logpdf_var(self, ranks):
scale = np.log(self._pdf_scale)
return (
np.array(
[
polygamma(1, self.pdf_alpha(rank))
- polygamma(1, self.pdf_alpha(rank) + self.pdf_beta(rank))
for rank in ranks
]
)
- scale
)
def logpdf_quantile(self, ranks, q):
return np.log(self.pdf_quantile(ranks, q))
[docs] def coverage(self):
"""return ranks, CDF(ranks), fraction of ranks with CDF(rank) <= X"""
ranks = self.ranks
return (
ranks,
self.cdf(ranks),
np.array([np.sum(self._obs <= r) for r in ranks], dtype=float)
/ len(self._obs),
)
[docs]class CalibrationMap(object):
"""
a helper object that represents 2 FixedBandwidth1DKDE, one for cleans and
one for glitches builds and manages these for you based on input samples can
then compute things like likelihood ratios, efficiency, fap, etc. Can even
generate a full ROC curve automatically.
"""
def __init__(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
num_points=DEFAULT_NUM_POINTS,
gch_num_points=DEFAULT_NUM_POINTS,
gch_b=DEFAULT_B,
cln_num_points=DEFAULT_NUM_POINTS,
cln_b=DEFAULT_B,
compute=True, # can be used to avoid computing loglike_cdf
clean_segs=None,
rate_estimation=DEFAULT_RATE_ESTIMATION,
min_livetime=DEFAULT_MIN_LIVETIME,
min_clean_livetime=DEFAULT_MIN_CLEAN_LIVETIME,
model_id=None,
**kwargs
):
# record provenance stuff
self._start = dataset.start
self._end = dataset.end
self._segs = segmentlist(dataset.segs) # make a copy
self._model_id = model_id
self._hash = None
# rate estimation
if rate_estimation == "livetime":
assert (
clean_segs is not None
), "need to provide clean segments if using rate estimation via livetime"
self.rate_estimation = rate_estimation
self._clean_segs = clean_segs
# health checks
self.min_livetime = min_livetime
self.min_clean_livetime = min_clean_livetime
# make a local copy of datasets to record minimal sets of data
self._gch_dataset, self._cln_dataset = dataset.vectors2classes()
self.compute_rates()
# set up KDE objects
self._gch_kde = FixedBandwidth1DKDE(
self._gch_dataset.ranks, num_points=gch_num_points, b=gch_b, **kwargs
)
self._cln_kde = FixedBandwidth1DKDE(
self._cln_dataset.ranks, num_points=cln_num_points, b=cln_b, **kwargs
)
self._ranks = np.linspace(0, 1, num_points)
# make sure that we have this stored in case we don't compute...
self._num_quantiles2dq(num_quantiles)
if compute:
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
@property
def ranks(self):
return self._ranks
@property
def start(self):
return min(self._gch_dataset.start, self._cln_dataset.start)
@start.setter
def start(self, new):
for dataset, kde in [
(self._gch_dataset, self._gch_kde),
(self._cln_dataset, self._cln_kde),
]:
dataset.start = new
dataset.end = max(new, dataset.end)
kde.remove_observations(dataset.filter().ranks)
self.compute_rates()
@property
def end(self):
return max(self._gch_dataset.end, self._cln_dataset.end)
@end.setter
def end(self, new):
for dataset, kde in [
(self._gch_dataset, self._gch_kde),
(self._cln_dataset, self._cln_kde),
]:
dataset.start = min(new, dataset.end)
dataset.end = new
kde.remove_observations(dataset.filter().ranks)
self.compute_rates()
@property
def segs(self):
return self._gch_dataset.segs | self._cln_dataset.segs
@segs.setter
def segs(self, segs):
if len(segs):
self.start = min(self.start, segs[0][0])
self.end = max(self.end, segs[-1][1])
@property
def hash(self):
"""the identifier used to locate this model."""
if self._hash is not None:
return self._hash
else:
return names.times_id2hash(self._start, self._end, self._model_id)
@hash.setter
def hash(self, new):
self._hash = new
@property
def model_id(self):
return self._model_id
def add_gch(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
clean_segs=None,
**auto_optimize_kwargs
):
if self.rate_estimation == "livetime" or self._clean_segs is not None:
assert clean_segs is not None, (
"need to provide clean segments if rate_estimation = livetime or "
"instantiated a calibration map with clean segments"
)
self._clean_segs |= clean_segs
self._gch_dataset += dataset
self._gch_kde.add_observations(dataset.ranks)
self.auto_optimize_gch(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def add_cln(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
clean_segs=None,
**auto_optimize_kwargs
):
if self.rate_estimation == "livetime" or self._clean_segs is not None:
assert clean_segs is not None, (
"need to provide clean segments if rate_estimation = livetime or "
"instantiated a calibration map with clean segments"
)
self._clean_segs |= clean_segs
self._cln_dataset += dataset
self._cln_kde.add_observations(dataset.ranks)
self.auto_optimize_cln(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def add(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
clean_segs=None,
**auto_optimize_kwargs
):
if self.rate_estimation == "livetime" or self._clean_segs is not None:
assert clean_segs is not None, (
"need to provide clean segments if rate_estimation = livetime or "
"instantiated a calibration map with clean segments"
)
self._clean_segs |= clean_segs
gch, cln = dataset.vectors2classes()
self._gch_dataset += gch
self._cln_dataset += cln
self._gch_kde.add_observations(gch.ranks)
self._cln_kde.add_observations(cln.ranks)
self.auto_optimize(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def remove_gch(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
**auto_optimize_kwargs
):
self._gch_dataset -= dataset
self._gch_kde.remove_observations(dataset.ranks)
if self._clean_segs: # enforce clean segments span dataset
self._clean_segs &= self.segs
self.auto_optimize_gch(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def remove_cln(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
**auto_optimize_kwargs
):
self._cln_dataset -= dataset
self._cln_kde.remove_observations(dataset.ranks)
if self._clean_segs: # enforce clean segments span dataset
self._clean_segs &= self.segs
self.auto_optimize_cln(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def remove(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
**auto_optimize_kwargs
):
gch, cln = dataset.vectors2classes()
self._gch_dataset -= dataset
self._cln_dataset -= dataset
self._gch_kde.remove_observations(gch.ranks)
self._cln_kde.remove_observations(cln.ranks)
if self._clean_segs: # enforce clean segments span dataset
self._clean_segs &= self.segs
self.auto_optimize(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def flush_gch(
self,
max_stride=DEFAULT_MAX_STRIDE,
max_samples=DEFAULT_MAX_SAMPLES,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
**auto_optimize_kwargs
):
self._gch_kde.remove_observations(
self._gch_dataset.flush(
max_stride=max_stride, max_samples=max_samples
).ranks
)
if self._clean_segs: # enforce clean segments span dataset
self._clean_segs &= self.segs
self.auto_optimize_gch(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def flush_cln(
self,
max_stride=DEFAULT_MAX_STRIDE,
max_samples=DEFAULT_MAX_SAMPLES,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
**auto_optimize_kwargs
):
self._cln_kde.remove_observations(
self._cln_dataset.flush(
max_stride=max_stride, max_samples=max_samples
).ranks
)
if self._clean_segs: # enforce clean segments span dataset
self._clean_segs &= self.segs
self.auto_optimize_cln(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def flush(
self,
max_stride=DEFAULT_MAX_STRIDE,
max_samples=DEFAULT_MAX_SAMPLES,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
**auto_optimize_kwargs
):
self._gch_kde.remove_observations(
self._gch_dataset.flush(
max_stride=max_stride, max_samples=max_samples
).ranks
)
self._cln_kde.remove_observations(
self._cln_dataset.flush(
max_stride=max_stride, max_samples=max_samples
).ranks
)
if self._clean_segs: # enforce clean segments span dataset
self._clean_segs &= self.segs
self.auto_optimize(**auto_optimize_kwargs)
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
self.compute_rates()
def add_and_flush_gch(
self,
dataset,
max_stride=DEFAULT_MAX_STRIDE,
max_samples=DEFAULT_MAX_SAMPLES,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
clean_segs=None,
**auto_optimize_kwargs
):
self._gch_dataset += dataset
self._gch_kde.add_observations(dataset.ranks)
if self.rate_estimation == "livetime" or self._clean_segs is not None:
assert clean_segs is not None, (
"need to provide clean segments if rate_estimation = livetime or "
"instantiated a calibration map with clean segments"
)
self._clean_segs |= clean_segs
self.flush_gch(
max_stride=max_stride,
max_samples=max_samples,
num_quantiles=num_quantiles,
num_mc=num_mc,
**auto_optimize_kwargs
)
def add_and_flush_cln(
self,
dataset,
max_stride=DEFAULT_MAX_STRIDE,
max_samples=DEFAULT_MAX_SAMPLES,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
clean_segs=None,
**auto_optimize_kwargs
):
self._cln_dataset += dataset
self._cln_kde.add_observations(dataset.ranks)
if self.rate_estimation == "livetime" or self._clean_segs is not None:
assert clean_segs is not None, (
"need to provide clean segments if rate_estimation = livetime or "
"instantiated a calibration map with clean segments"
)
self._clean_segs |= clean_segs
self.flush_cln(
max_stride=max_stride,
max_samples=max_samples,
num_quantiles=num_quantiles,
num_mc=num_mc,
**auto_optimize_kwargs
)
def add_and_flush(
self,
dataset,
max_stride=DEFAULT_MAX_STRIDE,
max_samples=DEFAULT_MAX_SAMPLES,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
clean_segs=None,
**auto_optimize_kwargs
):
gch, cln = dataset.vectors2classes()
self._gch_dataset += gch
self._cln_dataset += cln
self._gch_kde.add_observations(gch.ranks)
self._cln_kde.add_observations(cln.ranks)
if self.rate_estimation == "livetime" or self._clean_segs is not None:
assert clean_segs is not None, (
"need to provide clean segments if rate_estimation = livetime or "
"instantiated a calibration map with clean segments"
)
self._clean_segs |= clean_segs
self.flush(
max_stride=max_stride,
max_samples=max_samples,
num_quantiles=num_quantiles,
num_mc=num_mc,
**auto_optimize_kwargs
)
[docs] def compute_rates(self):
"""compute the approximate rate of gch and clean based on datasets"""
if self.rate_estimation == "samples": # an incredibly simple approximation...
self._gch_rate = (
1.0 * len(self._gch_dataset) / utils.livetime(self._gch_dataset.segs)
)
self._cln_rate = (
1.0 * len(self._cln_dataset) / utils.livetime(self._cln_dataset.segs)
)
elif self.rate_estimation == "livetime":
T = utils.livetime(self.segs)
Tc = utils.livetime(self._clean_segs)
self._cln_rate = 1.0 * (Tc / T)
self._gch_rate = 1.0 - self._cln_rate
else:
raise NotImplementedError(
"rate estimation {} not implemented".format(self.rate_estimation)
)
def _num_quantiles2dq(self, num_quantiles):
# we use num_quantiles+2 interpolation points (see compute_loglike_cdf)
# so we need to divide the unit interval into num_quantiles+1 "bins"
self._dq = 1.0 / (num_quantiles + 1)
[docs] def compute_loglike_cdf(
self, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC
):
"""
compute CDF(loglike|rank) interpolation object via monte carlo
"""
self._num_quantiles2dq(num_quantiles)
self._interp_loglike_cdf = np.empty(
(len(self._ranks), num_quantiles + 2), dtype="float"
)
self._interp_loglike_cdf[:, 0] = -np.infty # set end points
self._interp_loglike_cdf[:, -1] = +np.infty
# used within the iteration to avoid repeatedly defining this
percentiles = np.linspace(0.0, 100, num_quantiles + 2)[1:-1]
alpha_g = self._gch_kde.pdf_alpha(self._ranks)
beta_g = self._gch_kde.pdf_beta(self._ranks)
scale_g = self._gch_kde._pdf_scale
alpha_c = self._cln_kde.pdf_alpha(self._ranks)
beta_c = self._cln_kde.pdf_beta(self._ranks)
scale_c = self._cln_kde._pdf_scale
dscale = np.log(scale_g) - np.log(scale_c)
truth = np.empty(num_mc, dtype=bool)
samples = np.empty(num_mc, dtype=float)
# re-use these within loop to avoid re-allocating memory
gsamples = np.empty(num_mc, dtype=float)
csamples = np.empty(num_mc, dtype=float)
for i, (_, ag, bg, ac, bc) in enumerate(
zip(self._ranks, alpha_g, beta_g, alpha_c, beta_c)
):
gsamples[:] = beta.rvs(ag, bg, size=num_mc)
csamples[:] = beta.rvs(ac, bc, size=num_mc)
# catch all possible log(0) to avoid warnings
# only gsamples are zero
truth[:] = gsamples == 0
samples[truth] = -np.infty
# only csamples are zero
truth[:] = csamples == 0
samples[truth] = +np.infty
# both pdfs are zero
truth[:] = (gsamples == 0) * (csamples == 0)
samples[truth] = 0.0
# both pdfs are non-zero
truth[:] = (gsamples != 0) * (csamples != 0)
samples[truth] = np.log(gsamples[truth]) - np.log(csamples[truth]) + dscale
# the quantiles associated with monte carlo
self._interp_loglike_cdf[i, 1:-1] = [
np.percentile(samples, p) for p in percentiles
]
def _auto_optimize(
self,
kde,
dloglike=DEFAULT_DLOGLIKE,
minb=DEFAULT_MINB,
maxb=DEFAULT_MAXB,
tol=DEFAULT_TOL,
**kwargs
):
"""
determines whether loglike is likely to change enough to warrante
re-optimization. Specifically, we check whether
if dloglike < |(dloglike/db)|*(b*tol)
= (|dloglike/dlogb|*(1/b) * (b*tol))
= |kde.grad(kde.b)|*tol
then re-optimize
the logic being that if the uncertainty in the optimal b due to just the
tolerance is small enough, we shouldn't bother re-optimizing
"""
grad = kde.grad(kde.b)
if dloglike < np.abs(grad) * tol:
# run optimization algorithm
if grad > 0:
kde.optimize(minb=max(minb, kde.b), maxb=maxb, tol=tol, **kwargs)
else:
kde.optimize(minb=minb, maxb=min(kde.b, maxb), tol=tol, **kwargs)
def auto_optimize_gch(self, **kwargs):
self._auto_optimize(self._gch_kde, **kwargs)
def auto_optimize_cln(self, **kwargs):
self._auto_optimize(self._cln_kde, **kwargs)
[docs] def auto_optimize(self, **kwargs):
"""
delegation to individual functions for gch and cln
"""
glitch_b_factor = kwargs.pop("glitch_b_factor", DEFAULT_GLITCH_B)
clean_b_factor = kwargs.pop("clean_b_factor", DEFAULT_CLEAN_B)
self.auto_optimize_gch(b_factor=glitch_b_factor, **kwargs)
self.auto_optimize_cln(b_factor=clean_b_factor, **kwargs)
def optimize_gch(self, **kwargs):
self._gch_kde.optimize(**kwargs)
def optimize_cln(self, **kwargs):
self._cln_kde.optimize(**kwargs)
[docs] def optimize(self, **kwargs):
"""
delegation to individual functions for gch and cln
"""
glitch_b_factor = kwargs.pop("glitch_b_factor", DEFAULT_GLITCH_B)
clean_b_factor = kwargs.pop("clean_b_factor", DEFAULT_CLEAN_B)
self.optimize_gch(b_factor=glitch_b_factor, **kwargs)
self.optimize_cln(b_factor=clean_b_factor, **kwargs)
def gch_logpdf(self, ranks):
return self._gch_kde.logpdf(ranks)
def gch_logpdf_quantile(self, ranks, q):
return self._gch_kde.logpdf_quantile(ranks, q)
def gch_pdf(self, ranks):
return self._gch_kde.pdf(ranks)
def gch_pdf_quantile(self, ranks, q):
return self._gch_kde.pdf_quantile(ranks, q)
def gch_cdf(self, ranks):
return self._gch_kde.cdf(ranks)
def gch_cdf_quantile(self, ranks, q):
return self._gch_kde.cdf_quantile(ranks, q)
def cln_logpdf(self, ranks):
return self._cln_kde.logpdf(ranks)
def cln_logpdf_quantile(self, ranks, q):
return self._cln_kde.logpdf_quantile(ranks, q)
def cln_pdf(self, ranks):
return self._cln_kde.pdf(ranks)
def cln_pdf_quantile(self, ranks, q):
return self._cln_kde.pdf_quantile(ranks, q)
def cln_cdf(self, ranks):
return self._cln_kde.cdf(ranks)
def cln_cdf_quantile(self, ranks, q):
return self._cln_kde.cdf_quantile(ranks, q)
def eff(self, ranks):
# gch_cdf integrates from small ranks to big ones, we want the opposite
return 1.0 - self.gch_cdf(ranks)
def eff_quantile2rank(self, e):
return self._gch_kde.quantile(1.0 - e)
def fap(self, ranks):
return 1.0 - self.cln_cdf(ranks)
def fap_quantile2rank(self, f):
return self._cln_kde.quantile(1.0 - f)
@property
def prior_gch(self):
"""an estimate of the prior that any given sample is a glitch, based on
the approximate rate derived from self._gch_dataset"""
return self._gch_rate / (self._gch_rate + self._cln_rate)
@property
def prior_cln(self):
"""an estimate of the prior that any given sample is a clean, based on
the approximate rate derived from self._cln_dataset"""
return 1.0 - self.prior_gch # by definition
@property
def prior_odds(self):
"""the prior odds that a sample is a glitch vs a clean
(self.prior_gch/self.prior_cln)"""
return self._gch_rate / self._cln_rate
[docs] def pglitch(self, ranks):
"""
compute the p(glitch) at this value of b
pglitch = 1 / ( 1 + (np.exp(-loglike(b)/prior_odds) )
NOTE: this includes the prior_odds based on approximate rates of
glitches and cleans. If that is not desired, users can call loglike
directly and apply their own prior_odds
"""
return 1.0 / (1.0 + np.exp(-self.loglike(ranks)) / self.prior_odds)
[docs] def loglike(self, ranks):
"""
NOTE: returns log(E[p(r|g)]) - log(E[p(r|c)])
"""
return self._gch_kde.logpdf(ranks) - self._cln_kde.logpdf(ranks)
[docs] def loglike_mean(self, ranks):
"""
NOTE: returns E[log(p(r|g)) - log(p(r|c))]
"""
return self._gch_kde.logpdf_mean(ranks) - self._cln_kde.logpdf_mean(ranks)
def loglike_std(self, ranks):
return self.loglike_var(ranks) ** 0.5
def loglike_var(self, ranks):
return self._gch_kde.logpdf_var(ranks) + self._gch_kde.logpdf_var(ranks)
[docs] def loglike_quantile(self, ranks, q):
"""
interpolates between requested quantile and stored quantiles first, then
interpolates over the resulting function of rank to get the values at
the requested ranks makes use of self._ranks and
self._interp_loglike_cdf to do this, with self._interp_loglike_cdf being
computed via a monte-carlo integration as part of self.compute()
NOTE: this function assumes "q" is a float, not an array
"""
# find indices in self._loglike_cdf corresponding to quantile q
# note, this may give issues near the boundaries of our interpolation
# objects where I've by-hand set quantiles to -infty, +infty
i = int(q / self._dq)
j = i + 1
# pull out relevant points from interpolation object
lower = self._interp_loglike_cdf[:, i]
upper = self._interp_loglike_cdf[:, j]
# interpolate between neighboring estimates of CDF(rank) to find
# quanitles at self._ranks
interp_q = np.array([i, j]) * self._dq
interp_quantiles = [
np.interp(q, interp_q, [lower[rankind], upper[rankind]])
for rankind in range(len(self._ranks))
]
# interpolate between self._ranks to find the values at requested ranks
return np.interp(ranks, self._ranks, interp_quantiles)
[docs] def roc(self, ranks=None):
"""
returns fap(ranks), eff(ranks)
"""
if ranks is None:
ranks = self.ranks
return self.fap(ranks), self.eff(ranks)
[docs] def is_healthy(self):
"""
Determine the health of the pipeline.
Based on calculation of dfap/fap given in map.
Similar results shown in calibration accuracy plot.
"""
if not utils.livetime(self.segs) >= self.min_livetime:
return False
if not utils.livetime(self._clean_segs) >= self.min_clean_livetime:
return False
ranks = self._cln_dataset.ranks
fap = self.fap(ranks)
# calculate nominative and fractional FAP
vals, counts = np.unique(fap, return_counts=True)
nom, frc = np.transpose(
np.array([(val, count) for val, count in zip(vals, counts)], dtype=float)
)
order = nom.argsort()
nom = nom[order]
frc = np.cumsum(frc[order]) / len(ranks)
dfap = np.abs(((nom - frc) / frc) * 100)
return not np.any(dfap[nom > 1e-3] > 50)
# -------------------------------------------------
[docs]class DiscreteCalibrationMap(CalibrationMap):
"""
a helper object that represents 2 FixedBandwidth1DKDE, one for cleans and
one for glitches
builds and manages these for you based on input samples
can then compute things like likelihood ratios, efficiency, fap, etc.
Can even generate a full ROC curve automatically.
"""
def __init__(
self,
dataset,
num_quantiles=DEFAULT_NUM_QUANTILES,
num_mc=DEFAULT_NUM_MC,
compute=True, # can be used to avoid computing loglike_cdf
clean_segs=None,
rate_estimation=DEFAULT_RATE_ESTIMATION,
min_livetime=DEFAULT_MIN_LIVETIME,
min_clean_livetime=DEFAULT_MIN_CLEAN_LIVETIME,
model_id=None,
**kwargs
):
# record provenance stuff
self._start = dataset.start
self._end = dataset.end
self._segs = segmentlist(dataset.segs) # make a copy
self._model_id = model_id
# rate estimation
if rate_estimation == "livetime":
assert (
clean_segs is not None
), "need to provide clean segments if using rate estimation via livetime"
self.rate_estimation = rate_estimation
self._clean_segs = clean_segs
# health checks
self.min_livetime = min_livetime
self.min_clean_livetime = min_clean_livetime
# make a local copy of datasets to record minimal sets of data
self._gch_dataset, self._cln_dataset = dataset.vectors2classes()
self.compute_rates()
# set up KDE objects
self._gch_kde = Discrete1DKDE(self._gch_dataset.ranks, **kwargs)
self._cln_kde = Discrete1DKDE(self._cln_dataset.ranks, **kwargs)
# make sure that we have this stored in case we don't compute...
self._num_quantiles2dq(num_quantiles)
if compute:
self.compute_loglike_cdf(num_quantiles=num_quantiles, num_mc=num_mc)
@property
def ranks(self):
return sorted(
set(list(self._gch_kde._count.keys()) + list(self._cln_kde._count.keys()))
)
def _auto_optimize(self, *args, **kwargs):
# don't do anything since there is nothing to optimize for the discrete map
pass
[docs] def loglike(self, ranks):
"""
NOTE: returns log(E[p(r|g)]) - log(E[p(r|c)])
"""
gch = self._gch_kde.logpdf(ranks)
cln = self._cln_kde.logpdf(ranks)
# do this to avoid subtracting -inf from -inf
ans = np.zeros_like(gch, dtype=float)
truth = np.logical_not((gch == -np.infty) * (cln == -np.infty))
ans[truth] = gch[truth] - cln[truth]
return ans
[docs] def compute_loglike_cdf(
self, num_quantiles=DEFAULT_NUM_QUANTILES, num_mc=DEFAULT_NUM_MC
):
"""
compute CDF(loglike|rank) interpolation object via monte carlo
"""
self._num_quantiles2dq(num_quantiles)
ranks = self.ranks
self._loglike_cdf = dict(
(rank, np.empty(num_quantiles + 2, dtype=float)) for rank in ranks
)
for rank in ranks:
self._loglike_cdf[rank][0] = -np.infty # set end points
self._loglike_cdf[rank][-1] = +np.infty
# used within the iteration to avoid repeatedly defining this
percentiles = np.linspace(0.0, 100, num_quantiles + 2)[1:-1]
alpha_g, beta_g = np.transpose(self._gch_kde._pdf_params(ranks))
alpha_c, beta_c = np.transpose(self._cln_kde._pdf_params(ranks))
truth = np.empty(num_mc, dtype=bool)
samples = np.empty(num_mc, dtype=float)
gsamples = np.empty(
num_mc, dtype=float
) # re-use these within loop to avoid re-allocating memory
csamples = np.empty(num_mc, dtype=float)
for rank, ag, bg, ac, bc in zip(ranks, alpha_g, beta_g, alpha_c, beta_c):
gsamples[:] = beta.rvs(ag, bg, size=num_mc)
csamples[:] = beta.rvs(ac, bc, size=num_mc)
# catch all possible log(0) to avoid warnings
# only gsamples are zero
truth[:] = gsamples == 0
samples[truth] = -np.infty
# only csamples are zero
truth[:] = csamples == 0
samples[truth] = +np.infty
# both pdfs are zero
truth[:] = (gsamples == 0) * (csamples == 0)
samples[truth] = 0.0
# both pdfs are non-zero
truth[:] = (gsamples != 0) * (csamples != 0)
samples[truth] = np.log(gsamples[truth]) - np.log(csamples[truth])
# the quantiles associated with monte carlo
self._loglike_cdf[rank][1:-1] = [
np.percentile(samples, p) for p in percentiles
]
[docs] def loglike_quantile(self, ranks, q):
"""
interpolates between requested quantile and stored quantiles first, then
interpolates over the resulting function of rank to get the values at
the requested ranks makes use of self._ranks and
self._interp_loglike_cdf to do this, with self._interp_loglike_cdf being
computed via a monte-carlo integration as part of self.compute()
NOTE: this function assumes "q" is a float, not an array
"""
qs = np.arange(0, 1 + self._dq, self._dq)
return np.array(
[
np.interp(q, qs, self._interp_cdf[ranks])
if ranks in self._interp_cdf
else 0
],
dtype=float,
)
[docs]class Discrete1DKDE(object):
"""
an object similar to FixedBandwidth1DKDE but assuming ranks only occur at
discrete values instead of being drawn from a continuous distribution
"""
def __init__(self, obs, **kwargs):
self._obs = np.array([], dtype=float)
self._count = defaultdict(int)
self.add_observations(obs) # sets up cumulative counter as well
@property
def obs(self):
return self._obs
@property
def ranks(self):
return sorted(self._count.keys())
def optimize(self, **kwargs):
pass # do nothing here because there is nothing to optimize
def add_observations(self, obs):
for rank in obs:
self._count[rank] += 1
self._compute_cum_count()
self._obs = np.concatenate((self._obs, obs))
def remove_observations(self, obs):
o = list(self.obs)
for rank in obs:
if self._obs[rank] == 0:
raise ValueError(
"cannot remove rank=%.3e because it is not contained in KDE!" % rank
)
self._obs[rank] -= 1
o.remove(rank)
self._compute_cum_count()
self._obs = np.array(o)
def _compute_cum_count(self):
self._cum_count = []
c = 0
for rank in sorted(self._count.keys()):
c += self._count[rank]
self._cum_count.append((rank, c))
self._cum_count = np.array(self._cum_count, dtype=float)
[docs] def flush(self, max_samples=DEFAULT_MAX_SAMPLES):
"""
remove the observations from the front of the list until the total
number of observations is <=max_samples.
"""
N = len(self.obs)
if max_samples == 0:
# don't bother to re-compute anything, just zero everything out
self._obs = np.array([], dtype=float)
self._count = defaultdict(int)
elif max_samples < N:
self.remove_observations(self.obs[: N - max_samples])
else:
pass
def _params(self, count, N):
if N:
return np.array([(c + 1, N - c + 1) for c in count], dtype=float)
else:
# uniform distribution
return np.array([(1, 1) for c in count], dtype=float)
def _cdf_params(self, ranks):
N = len(self._obs)
return self._params(self.cdf(ranks) * N, N)
def _pdf_params(self, ranks):
N = len(self._obs)
return self._params(self.pdf(ranks) * N, N)
def cdf(self, ranks):
if len(self._obs):
ans = []
for rank in ranks:
ind = self._cum_count[:, 0] <= rank
if np.any(ind):
ans.append(self._cum_count[ind, 1][-1]) # the biggest rank
else:
ans.append(0)
return np.array(ans, dtype=float) / len(self._obs) # normalize the counts
else:
# assume a uniform distribution, so the CDF is just how much we've
# integrated, which is the rank
return np.array(ranks, dtype=float)
def cdf_std(self, ranks):
return self.cdf_var(ranks) ** 0.5
def cdf_var(self, ranks):
c = self.cdf(ranks)
N = len(self._obs)
return c * (1 - c) / N
def cdf_quantile(self, ranks, q):
return np.array(
[beta.ppf(q, a, b) for a, b in self._cdf_params(ranks)], dtype=float
)
def quantile(self, q):
return np.interp(
q, self._cum_count[:, 1] / len(self._obs), self._cum_count[:, 0]
)
def pdf(self, ranks):
if len(self._obs):
return np.array(
[self._count.get(rank, 0) for rank in ranks], dtype=float
) / len(self._obs)
else:
# just say that there is vanishing probability associated with any
# indiv rank without any observations
return np.ones_like(ranks, dtype=float) * 0.5
def pdf_std(self, ranks):
return self.pdf_var(ranks) ** 0.5
def pdf_var(self, ranks):
p = self.pdf(ranks)
N = len(self._obs)
return p * (1 - p) / N
def pdf_quantile(self, ranks, q):
return np.array(
[beta.ppf(q, a, b) for a, b in self._pdf_params(ranks)], dtype=float
)
def logpdf(self, ranks):
ans = -np.infty * np.ones_like(ranks, dtype=float)
pdf = self.pdf(ranks)
truth = pdf > 0
ans[truth] = np.log(pdf[truth])
return ans
def logpdf_mean(self, ranks):
return np.array(
[digamma(a) - digamma(a + b) for a, b in self._pdf_params(ranks)]
)
def logpdf_std(self, ranks):
return self.logpdf_var(ranks) ** 0.5
def logpdf_var(self, ranks):
return np.array(
[polygamma(1, a) - polygamma(1, a + b) for a, b in self._pdf_params(ranks)]
)
def logpdf_quantile(self, ranks, q):
return np.log(self.pdf_quantile(ranks, q))
[docs] def coverage(self):
"""return ranks, CDF(ranks), fraction of ranks with CDF(rank) <= X"""
ranks = self.ranks
return (
ranks,
self.cdf(ranks),
np.array([np.sum(self._obs <= r) for r in ranks], dtype=float)
/ len(self._obs),
)