Coverage for pesummary/utils/bounded_1d_kde.py: 75.4%

122 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-02 08:42 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3import copy 

4import numpy as np 

5from scipy.stats import gaussian_kde as kde 

6from scipy.ndimage import gaussian_filter1d 

7from pesummary.utils.decorators import deprecation 

8from pesummary.utils.utils import logger 

9 

10__author__ = [ 

11 "Charlie Hoy <charlie.hoy@ligo.org>", 

12 "Michael Puerrer <michael.puerrer@ligo.org>" 

13] 

14 

15 

16def transform_logit(x, a=0., b=1.): 

17 """ 

18 """ 

19 return np.log((x - a) / (b - x)) 

20 

21 

22def inverse_transform_logit(y, a=0., b=1.): 

23 """ 

24 """ 

25 return (a + b * np.exp(y)) / (1 + np.exp(y)) 

26 

27 

28def dydx_logit(x, a=0., b=1.): 

29 """ 

30 """ 

31 return (-a + b) / ((a - x) * (-b + x)) 

32 

33 

34def bounded_1d_kde( 

35 pts, method="Reflection", xlow=None, xhigh=None, *args, **kwargs 

36): 

37 """Return a bounded 1d KDE 

38 

39 Parameters 

40 ---------- 

41 pts: np.ndarray 

42 The datapoints to estimate a bounded kde from 

43 method: str, optional 

44 Method you wish to use to handle the boundaries 

45 xlow: float 

46 The lower bound of the distribution 

47 xhigh: float 

48 The upper bound of the distribution 

49 """ 

50 try: 

51 return _kdes["{}BoundedKDE".format(method)]( 

52 pts, xlow=xlow, xhigh=xhigh, *args, **kwargs 

53 ) 

54 except KeyError: 

55 raise ValueError("Unknown method: {}".format(method)) 

56 

57 

58class BoundedKDE(kde): 

59 """Base class to handle the BoundedKDE 

60 

61 Parameters 

62 ---------- 

63 pts: np.ndarray 

64 The datapoints to estimate a bounded kde from 

65 xlow: float 

66 The lower bound of the distribution 

67 xhigh: float 

68 The upper bound of the distribution 

69 """ 

70 def __init__(self, pts, xlow=None, xhigh=None, *args, **kwargs): 

71 pts = np.atleast_1d(pts) 

72 if pts.ndim != 1: 

73 raise TypeError("BoundedKDE can only be one-dimensional") 

74 super(BoundedKDE, self).__init__(pts.T, *args, **kwargs) 

75 self._xlow = xlow 

76 self._xhigh = xhigh 

77 

78 @property 

79 def xlow(self): 

80 """The lower bound of the x domain 

81 """ 

82 return self._xlow 

83 

84 @property 

85 def xhigh(self): 

86 """The upper bound of the x domain 

87 """ 

88 return self._xhigh 

89 

90 

91class TransformBoundedKDE(BoundedKDE): 

92 """Represents a one-dimensional Gaussian kernel density estimator 

93 for a probability distribution function that exists on a bounded 

94 domain. The bounds are handled by transforming to a new parameter 

95 space which is "unbounded" and then generating a KDE (including a Jacobian) 

96 in bounded space 

97 

98 Parameters 

99 ---------- 

100 pts: np.ndarray 

101 The datapoints to estimate a bounded kde from 

102 xlow: float 

103 The lower bound of the distribution 

104 xhigh: float 

105 The upper bound of the distribution 

106 transform: str/func, optional 

107 The transform you wish to use. Default logit 

108 inv_transform: func, optional 

109 Inverse function of transform 

110 dydx: func, optional 

111 Derivateive of transform 

112 N: int, optional 

113 Number of points to use generating the KDE 

114 smooth: float, optional 

115 level of smoothing you wish to apply. Default 3 

116 apply_smoothing: Bool, optional 

117 Whether or not to apply smoothing. Default False 

118 """ 

119 allowed = ["logit"] 

120 

121 def __init__( 

122 self, pts, xlow=None, xhigh=None, transform="logit", inv_transform=None, 

123 dydx=None, alpha=1.5, N=100, smooth=3, apply_smoothing=False, 

124 weights=None, same_input=True, *args, **kwargs 

125 ): 

126 import pandas 

127 

128 self.inv_transform = inv_transform 

129 self.dydx = dydx 

130 self.transform = transform 

131 self.same_input = same_input 

132 if isinstance(pts, pandas.core.series.Series): 

133 pts = np.array(pts) 

134 _args = np.hstack(np.argwhere((pts > xlow) & (pts < xhigh))) 

135 pts = pts[_args] 

136 if weights is not None: 

137 if isinstance(weights, pandas.core.series.Series): 

138 weights = np.array(weights) 

139 weights = weights[_args] 

140 transformed_pts = self.transform(pts, xlow, xhigh) 

141 super(TransformBoundedKDE, self).__init__( 

142 transformed_pts, xlow=xlow, xhigh=xhigh, *args, **kwargs 

143 ) 

144 self.alpha = alpha 

145 self.N = N 

146 self.smooth = smooth 

147 self.apply_smoothing = apply_smoothing 

148 

149 @property 

150 def transform(self): 

151 return self._transform 

152 

153 @transform.setter 

154 def transform(self, transform): 

155 if isinstance(transform, str) and transform not in self.allowed: 

156 raise ValueError( 

157 "Please provide either a transform function or pick an " 

158 "allowed transform from the list: {}".format( 

159 ", ".join(self.allowed) 

160 ) 

161 ) 

162 elif isinstance(transform, str): 

163 self.inv_transform = _default_methods[ 

164 "inverse_transform_{}".format(transform) 

165 ] 

166 self.dydx = _default_methods["dydx_{}".format(transform)] 

167 transform = _default_methods["transform_{}".format(transform)] 

168 if not isinstance(transform, str): 

169 if any(param is None for param in [self.inv_transform, self.dydx]): 

170 raise ValueError( 

171 "Please provide an inverse transformation and the " 

172 "derivative of the transform" 

173 ) 

174 self._transform = transform 

175 

176 def __call__(self, pts): 

177 _original = copy.deepcopy(pts) 

178 _args = np.argwhere((pts > self.xlow) & (pts < self.xhigh)) 

179 if len(_args) != len(np.atleast_1d(pts)): 

180 logger.info( 

181 "Removing {} samples as they are outside of the allowed " 

182 "domain".format(len(np.atleast_1d(pts)) - len(_args)) 

183 ) 

184 if not len(_args): 

185 return np.zeros_like(pts) 

186 

187 pts = np.hstack(pts[_args]) 

188 pts = self.transform(np.atleast_1d(pts), self.xlow, self.xhigh) 

189 delta = np.max(pts) - np.min(pts) 

190 ymin = np.min(pts) - ((self.alpha - 1.) / 2) * delta 

191 ymax = np.max(pts) + ((self.alpha - 1.) / 2) * delta 

192 y = np.linspace(ymin, ymax, self.N) 

193 x = self.inv_transform(y, self.xlow, self.xhigh) 

194 Y = self.evaluate(y) * np.abs(self.dydx(x, self.xlow, self.xhigh)) 

195 if self.apply_smoothing: 

196 Y = gaussian_filter1d(Y, sigma=self.smooth) 

197 if self.same_input: 

198 from scipy.interpolate import interp1d 

199 

200 f = interp1d(x, Y) 

201 _args = np.argwhere( 

202 (_original > np.amin(x)) & (_original < np.amax(x)) 

203 ) 

204 _Y = f(_original[_args]) 

205 Y = np.zeros(len(_original)) 

206 Y[_args] = _Y 

207 return Y 

208 return x, Y 

209 

210 

211class ReflectionBoundedKDE(BoundedKDE): 

212 """Represents a one-dimensional Gaussian kernel density estimator 

213 for a probability distribution function that exists on a bounded 

214 domain. The bounds are treated as reflections 

215 

216 Parameters 

217 ---------- 

218 pts: np.ndarray 

219 The datapoints to estimate a bounded kde from 

220 xlow: float 

221 The lower bound of the distribution 

222 xhigh: float 

223 The upper bound of the distribution 

224 """ 

225 def __init__(self, pts, xlow=None, xhigh=None, *args, **kwargs): 

226 super(ReflectionBoundedKDE, self).__init__( 

227 pts, xlow=xlow, xhigh=xhigh, *args, **kwargs 

228 ) 

229 

230 def evaluate(self, pts): 

231 """Return an estimate of the density evaluated at the given 

232 points 

233 """ 

234 x = pts.T 

235 pdf = super(ReflectionBoundedKDE, self).evaluate(pts.T) 

236 if self.xlow is not None: 

237 pdf += super(ReflectionBoundedKDE, self).evaluate(2 * self.xlow - x) 

238 if self.xhigh is not None: 

239 pdf += super(ReflectionBoundedKDE, self).evaluate(2 * self.xhigh - x) 

240 return pdf 

241 

242 def __call__(self, pts): 

243 pts = np.atleast_1d(pts) 

244 out_of_bounds = np.zeros(pts.shape[0], dtype='bool') 

245 

246 if self.xlow is not None: 

247 out_of_bounds[pts < self.xlow] = True 

248 if self.xhigh is not None: 

249 out_of_bounds[pts > self.xhigh] = True 

250 

251 results = self.evaluate(pts) 

252 results[out_of_bounds] = 0. 

253 return results 

254 

255 

256class Bounded_1d_kde(ReflectionBoundedKDE): 

257 @deprecation( 

258 "The Bounded_1d_kde class has changed its name to ReflectionBoundedKDE. " 

259 "Bounded_1d_kde may not be supported in future releases. Please update." 

260 ) 

261 def __init__(self, *args, **kwargs): 

262 return super(Bounded_1d_kde, self).__init__(*args, **kwargs) 

263 

264 

265_kdes = { 

266 "TransformBoundedKDE": TransformBoundedKDE, 

267 "ReflectionBoundedKDE": ReflectionBoundedKDE, 

268 "Bounded_1d_kde": Bounded_1d_kde 

269} 

270 

271_default_methods = { 

272 "transform_logit": transform_logit, 

273 "inverse_transform_logit": inverse_transform_logit, 

274 "dydx_logit": dydx_logit 

275}