Coverage for pesummary/utils/bounded_1d_kde.py: 43.6%
149 statements
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
« prev ^ index » next coverage.py v7.4.4, created at 2024-12-09 22:34 +0000
1# Licensed under an MIT style license -- see LICENSE.md
3import copy
4import numpy as np
5from scipy.stats import gaussian_kde as kde
6from scipy.ndimage import gaussian_filter1d
7from pesummary.utils.decorators import deprecation
8from pesummary.utils.utils import logger
10__author__ = [
11 "Charlie Hoy <charlie.hoy@ligo.org>",
12 "Michael Puerrer <michael.puerrer@ligo.org>"
13]
16def transform_logit(x, a=0., b=1.):
17 """
18 """
19 return np.log((x - a) / (b - x))
22def inverse_transform_logit(y, a=0., b=1.):
23 """
24 """
25 return (a + b * np.exp(y)) / (1 + np.exp(y))
28def dydx_logit(x, a=0., b=1.):
29 """
30 """
31 return (-a + b) / ((a - x) * (-b + x))
34def bounded_1d_kde(
35 pts, method="Reflection", xlow=None, xhigh=None, *args, **kwargs
36):
37 """Return a bounded 1d KDE
39 Parameters
40 ----------
41 pts: np.ndarray
42 The datapoints to estimate a bounded kde from
43 method: str, optional
44 Method you wish to use to handle the boundaries
45 xlow: float
46 The lower bound of the distribution
47 xhigh: float
48 The upper bound of the distribution
49 """
50 try:
51 return _kdes["{}BoundedKDE".format(method)](
52 pts, xlow=xlow, xhigh=xhigh, *args, **kwargs
53 )
54 except KeyError:
55 raise ValueError("Unknown method: {}".format(method))
58class BoundedKDE(kde):
59 """Base class to handle the BoundedKDE
61 Parameters
62 ----------
63 pts: np.ndarray
64 The datapoints to estimate a bounded kde from
65 xlow: float
66 The lower bound of the distribution
67 xhigh: float
68 The upper bound of the distribution
69 """
70 def __init__(self, pts, xlow=None, xhigh=None, *args, **kwargs):
71 pts = np.atleast_1d(pts)
72 if pts.ndim != 1:
73 raise TypeError("BoundedKDE can only be one-dimensional")
74 super(BoundedKDE, self).__init__(pts.T, *args, **kwargs)
75 self._xlow = xlow
76 self._xhigh = xhigh
78 @property
79 def xlow(self):
80 """The lower bound of the x domain
81 """
82 return self._xlow
84 @property
85 def xhigh(self):
86 """The upper bound of the x domain
87 """
88 return self._xhigh
91class TransformBoundedKDE(BoundedKDE):
92 """Represents a one-dimensional Gaussian kernel density estimator
93 for a probability distribution function that exists on a bounded
94 domain. The bounds are handled by transforming to a new parameter
95 space which is "unbounded" and then generating a KDE (including a Jacobian)
96 in bounded space
98 Parameters
99 ----------
100 pts: np.ndarray
101 The datapoints to estimate a bounded kde from
102 xlow: float
103 The lower bound of the distribution
104 xhigh: float
105 The upper bound of the distribution
106 transform: str/func, optional
107 The transform you wish to use. Default logit
108 inv_transform: func, optional
109 Inverse function of transform
110 dydx: func, optional
111 Derivateive of transform
112 N: int, optional
113 Number of points to use generating the KDE
114 smooth: float, optional
115 level of smoothing you wish to apply. Default 3
116 apply_smoothing: Bool, optional
117 Whether or not to apply smoothing. Default False
118 """
119 allowed = ["logit"]
121 def __init__(
122 self, pts, xlow=None, xhigh=None, transform="logit", inv_transform=None,
123 dydx=None, alpha=1.5, N=100, smooth=3, apply_smoothing=False,
124 weights=None, same_input=True, *args, **kwargs
125 ):
126 import pandas
128 self.inv_transform = inv_transform
129 self.dydx = dydx
130 self.transform = transform
131 self.same_input = same_input
132 if isinstance(pts, pandas.core.series.Series):
133 pts = np.array(pts)
134 _args = np.hstack(np.argwhere((pts > xlow) & (pts < xhigh)))
135 pts = pts[_args]
136 if weights is not None:
137 if isinstance(weights, pandas.core.series.Series):
138 weights = np.array(weights)
139 weights = weights[_args]
140 transformed_pts = self.transform(pts, xlow, xhigh)
141 super(TransformBoundedKDE, self).__init__(
142 transformed_pts, xlow=xlow, xhigh=xhigh, *args, **kwargs
143 )
144 self.alpha = alpha
145 self.N = N
146 self.smooth = smooth
147 self.apply_smoothing = apply_smoothing
149 @property
150 def transform(self):
151 return self._transform
153 @transform.setter
154 def transform(self, transform):
155 if isinstance(transform, str) and transform not in self.allowed:
156 raise ValueError(
157 "Please provide either a transform function or pick an "
158 "allowed transform from the list: {}".format(
159 ", ".join(self.allowed)
160 )
161 )
162 elif isinstance(transform, str):
163 self.inv_transform = _default_methods[
164 "inverse_transform_{}".format(transform)
165 ]
166 self.dydx = _default_methods["dydx_{}".format(transform)]
167 transform = _default_methods["transform_{}".format(transform)]
168 if not isinstance(transform, str):
169 if any(param is None for param in [self.inv_transform, self.dydx]):
170 raise ValueError(
171 "Please provide an inverse transformation and the "
172 "derivative of the transform"
173 )
174 self._transform = transform
176 def __call__(self, pts):
177 _original = copy.deepcopy(pts)
178 _args = np.argwhere((pts > self.xlow) & (pts < self.xhigh))
179 if len(_args) != len(np.atleast_1d(pts)):
180 logger.info(
181 "Removing {} samples as they are outside of the allowed "
182 "domain".format(len(np.atleast_1d(pts)) - len(_args))
183 )
184 if not len(_args):
185 return np.zeros_like(pts)
187 pts = np.hstack(pts[_args])
188 pts = self.transform(np.atleast_1d(pts), self.xlow, self.xhigh)
189 delta = np.max(pts) - np.min(pts)
190 ymin = np.min(pts) - ((self.alpha - 1.) / 2) * delta
191 ymax = np.max(pts) + ((self.alpha - 1.) / 2) * delta
192 y = np.linspace(ymin, ymax, self.N)
193 x = self.inv_transform(y, self.xlow, self.xhigh)
194 Y = self.evaluate(y) * np.abs(self.dydx(x, self.xlow, self.xhigh))
195 if self.apply_smoothing:
196 Y = gaussian_filter1d(Y, sigma=self.smooth)
197 if self.same_input:
198 from scipy.interpolate import interp1d
200 f = interp1d(x, Y)
201 _args = np.argwhere(
202 (_original > np.amin(x)) & (_original < np.amax(x))
203 )
204 _Y = f(_original[_args])
205 Y = np.zeros(len(_original))
206 Y[_args] = _Y
207 return Y
208 return x, Y
211class ReflectionBoundedKDE(BoundedKDE):
212 """Represents a one-dimensional Gaussian kernel density estimator
213 for a probability distribution function that exists on a bounded
214 domain. The bounds are treated as reflections
216 Parameters
217 ----------
218 pts: np.ndarray
219 The datapoints to estimate a bounded kde from
220 xlow: float
221 The lower bound of the distribution
222 xhigh: float
223 The upper bound of the distribution
224 """
225 def __init__(self, pts, xlow=None, xhigh=None, *args, **kwargs):
226 super(ReflectionBoundedKDE, self).__init__(
227 pts, xlow=xlow, xhigh=xhigh, *args, **kwargs
228 )
230 def evaluate(self, pts):
231 """Return an estimate of the density evaluated at the given
232 points
233 """
234 x = pts.T
235 pdf = super(ReflectionBoundedKDE, self).evaluate(pts.T)
236 if self.xlow is not None:
237 pdf += super(ReflectionBoundedKDE, self).evaluate(2 * self.xlow - x)
238 if self.xhigh is not None:
239 pdf += super(ReflectionBoundedKDE, self).evaluate(2 * self.xhigh - x)
240 return pdf
242 def __call__(self, pts):
243 pts = np.atleast_1d(pts)
244 out_of_bounds = np.zeros(pts.shape[0], dtype='bool')
246 if self.xlow is not None:
247 out_of_bounds[pts < self.xlow] = True
248 if self.xhigh is not None:
249 out_of_bounds[pts > self.xhigh] = True
251 results = self.evaluate(pts)
252 results[out_of_bounds] = 0.
253 return results
256class PeriodicBoundedKDE(ReflectionBoundedKDE):
257 """Represents a one-dimensional Gaussian kernel density estimator
258 for a probability distribution function that exists on a bounded
259 domain. The bounds are treated as periodic
261 Parameters
262 ----------
263 pts: np.ndarray
264 The datapoints to estimate a bounded kde from
265 xlow: float
266 The lower bound of the distribution
267 xhigh: float
268 The upper bound of the distribution
269 smooth: float, optional
270 level of smoothing you wish to apply. Default 3
271 apply_smoothing: Bool, optional
272 Whether or not to apply smoothing. Default False
273 """
274 def __init__(
275 self, pts, xlow=None, xhigh=None, apply_smoothing=False, smooth=3,
276 *args, **kwargs
277 ):
278 if all(_ is None for _ in [xlow, xhigh]):
279 raise ValueError(
280 "xlow and xhigh must both be provided if using the "
281 "periodic KDE"
282 )
283 self.shift = (xlow + xhigh) / 2
284 shifted_pts = self._shift(pts)
285 super(PeriodicBoundedKDE, self).__init__(
286 shifted_pts, xlow=xlow, xhigh=xhigh, *args, **kwargs
287 )
288 self.apply_smoothing = apply_smoothing
289 self.smooth = smooth
291 def _shift(self, pts):
292 return np.append(
293 pts[pts > self.shift] - self.shift,
294 pts[pts < self.shift] + self.shift
295 )
297 def __call__(self, pts):
298 pts = np.atleast_1d(pts)
299 shifted_pts = self._shift(pts)
300 results = self.evaluate(shifted_pts)
301 unshifted = np.zeros_like(pts)
302 unshifted[pts < self.shift] = results[pts > self.shift]
303 unshifted[pts > self.shift] = results[pts < self.shift]
304 if self.apply_smoothing:
305 unshifted = gaussian_filter1d(unshifted, sigma=self.smooth)
306 out_of_bounds = np.zeros(pts.shape[0], dtype='bool')
307 if self.xlow is not None:
308 out_of_bounds[pts < self.xlow] = True
309 if self.xhigh is not None:
310 out_of_bounds[pts > self.xhigh] = True
311 unshifted[out_of_bounds] = 0.
312 return unshifted
315class Bounded_1d_kde(ReflectionBoundedKDE):
316 @deprecation(
317 "The Bounded_1d_kde class has changed its name to ReflectionBoundedKDE. "
318 "Bounded_1d_kde may not be supported in future releases. Please update."
319 )
320 def __init__(self, *args, **kwargs):
321 return super(Bounded_1d_kde, self).__init__(*args, **kwargs)
324_kdes = {
325 "TransformBoundedKDE": TransformBoundedKDE,
326 "ReflectionBoundedKDE": ReflectionBoundedKDE,
327 "PeriodicBoundedKDE": PeriodicBoundedKDE,
328 "Bounded_1d_kde": Bounded_1d_kde
329}
331_default_methods = {
332 "transform_logit": transform_logit,
333 "inverse_transform_logit": inverse_transform_logit,
334 "dydx_logit": dydx_logit
335}