Coverage for pesummary/utils/pdf.py: 71.2%

156 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-02 08:42 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3from scipy.stats._distn_infrastructure import rv_continuous, rv_sample 

4from scipy.interpolate import interp1d, interp2d 

5import numpy as np 

6from pesummary.utils.array import Array 

7 

8__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

9 

10 

11class InterpolatedPDF(rv_continuous): 

12 """Subclass of the scipy.stats._distn_infrastructure.rv_continous class. 

13 This class handles interpolated PDFs 

14 

15 Attributes 

16 ---------- 

17 interpolant: func 

18 the interpolant to use when evaluate probabilities 

19 

20 Methods 

21 ------- 

22 pdf: 

23 Evaluate the interpolant for a given input and return the PDF at that 

24 input 

25 """ 

26 def __new__(cls, *args, **kwargs): 

27 return super(InterpolatedPDF, cls).__new__(cls) 

28 

29 def __init__(self, *args, interpolant=None, dimensionality=1, **kwargs): 

30 self.interpolant = interpolant 

31 self.dimensionality = dimensionality 

32 if self.dimensionality > 2: 

33 import warnings 

34 warnings.warn( 

35 "The .rvs() method will currently only work for dimensionalities " 

36 "< 3" 

37 ) 

38 super(InterpolatedPDF, self).__init__(*args, **kwargs) 

39 

40 def _pdf(self, x): 

41 return self.interpolant(x) 

42 

43 def rvs(self, size=None, N=10**6): 

44 if size is None: 

45 return 

46 if self.dimensionality > 1: 

47 raise ValueError("method only valid for one dimensional data") 

48 _xrange = np.linspace( 

49 self.interpolant.x[0], self.interpolant.x[-1], N 

50 ) 

51 args = _xrange 

52 # if self.dimensionality > 1: 

53 # _yrange = np.linspace( 

54 # self.interpolant.y[0], self.interpolant.y[-1], N 

55 # ) 

56 # args = [args, _yrange] 

57 

58 probs = self.interpolant(args).flatten() 

59 inds = np.random.choice( 

60 np.arange(N**self.dimensionality), p=probs / np.sum(probs), 

61 size=size 

62 ) 

63 # if self.dimensionality > 1: 

64 # row = inds // 10 

65 # column = inds % 10 

66 # return np.array([_xrange[row], _yrange[columns]]).T 

67 return args[inds] 

68 

69 

70class DiscretePDF(rv_sample): 

71 """Subclass of the scipy.stats._distn_infrastructure.rv_sample class. This 

72 class handles discrete probabilities. 

73 

74 Parameters 

75 ---------- 

76 args: list, optional 

77 2d list of length 2. First element integers, second element probabilities 

78 corresponding to integers. 

79 values: list, optional 

80 2d list of length 2. First element integers, second element probabilities 

81 corresponding to integers. 

82 **kwargs: dict, optional 

83 all additional kwargs passed to the 

84 scipy.stats._distn_infrastructure.rv_sample class 

85 

86 Attributes 

87 ---------- 

88 x: np.ndarray 

89 array of integers with corresponding probabilities 

90 probs: np.ndarray 

91 array of probabilities corresponding to the array of integers 

92 

93 Methods 

94 ------- 

95 interpolate: 

96 interpolate the discrete probabilities and return a continuous 

97 InterpolatedPDF object 

98 percentile: 

99 calculate a percentile from the discrete PDF 

100 write: 

101 write the discrete PDF to file using the pesummary.io.write module 

102 

103 Examples 

104 -------- 

105 >>> from pesummary.utils.pdf import DiscretePDF 

106 >>> numbers = [1, 2, 3, 4] 

107 >>> distribution = [0.1, 0.2, 0.3, 0.4] 

108 >>> probs = DiscretePDF(numbers, distribution) 

109 >>> print(probs.x) 

110 [1, 2, 3, 4] 

111 >>> probs = DiscretePDF(values=[numbers, distribution]) 

112 >>> print(probs.x) 

113 [1, 2, 3, 4] 

114 """ 

115 def __new__(cls, *args, **kwargs): 

116 return super(DiscretePDF, cls).__new__(cls) 

117 

118 def __init__(self, *args, **kwargs): 

119 if len(args): 

120 try: 

121 self.x, self.probs = args 

122 except ValueError: 

123 self.x, self.probs = list(args)[0] 

124 kwargs["values"] = [self.x, self.probs] 

125 args = () 

126 else: 

127 self._values = kwargs.get("values", None) 

128 self.x = self._values[0] if self._values is not None else None 

129 self.probs = self._values[1] if self._values is not None else None 

130 super(DiscretePDF, self).__init__(*args, **kwargs) 

131 

132 def _pmf(self, x): 

133 _x = np.atleast_1d(x) 

134 if any(value not in self.x for value in _x): 

135 raise ValueError( 

136 "Unable to compute PMF for some of the provided values as " 

137 "provided probabilities are discrete. Either choose a value " 

138 "from {} or interpolate the data with " 

139 "`.interpolate().pdf({}).`".format(np.array(self.x), np.array(x)) 

140 ) 

141 return super(DiscretePDF, self)._pmf(x) 

142 

143 def rvs(self, *args, **kwargs): 

144 return Array(super(DiscretePDF, self).rvs(*args, **kwargs)) 

145 

146 def interpolate(self, interpolant=interp1d, include_bounds=True): 

147 """Interpolate the discrete pdf and return an InterpolatedPDF object 

148 

149 Parameters 

150 ---------- 

151 interpolant: func 

152 function to use to interpolate the discrete pdf 

153 include_bounds: Bool, optional 

154 if True, pass the upper and lower bounds to InterpolatedPDF 

155 """ 

156 kwargs = {} 

157 if include_bounds: 

158 kwargs.update({"a": self.x[0], "b": self.x[-1]}) 

159 return InterpolatedPDF( 

160 interpolant=interp1d(self.x, self.probs), **kwargs 

161 ) 

162 

163 def percentile(self, p): 

164 """Calculate a percentile from the discrete PDF 

165 

166 Parameters 

167 ---------- 

168 p: float, list 

169 percentile/list of percentiles to calculate 

170 """ 

171 from pesummary.utils.credible_interval import credible_interval 

172 return credible_interval(self.x, p, weights=self.probs) 

173 

174 def write(self, *args, **kwargs): 

175 """Write the discrete PDF to file using the pesummary.io.write module 

176 

177 Parameters 

178 ---------- 

179 *args: tuple 

180 all args passed to pesummary.io.write function 

181 **kwargs: dict, optional 

182 all kwargs passed to the pesummary.io.write function 

183 """ 

184 from pesummary.io import write 

185 if "dataset_name" not in kwargs.keys(): 

186 kwargs["dataset_name"] = "discrete_pdf" 

187 write( 

188 ["x", "PDF"], np.array([self.x, self.probs]).T, *args, **kwargs 

189 ) 

190 

191 

192class DiscretePDF2D(object): 

193 """Class to handle 2D discrete probabilities. 

194 

195 Parameters 

196 ---------- 

197 args: list, optional 

198 2d list of length 3. First element integers for the x axis, second 

199 element inters for the y axis and third element, the 2d joint 

200 probability density. 

201 

202 Attributes 

203 ---------- 

204 x: np.ndarray 

205 array of integers for the x axis 

206 y: np.ndarray 

207 array of integers for the y axis 

208 probs: np.ndarray 

209 2D array of probabilities for the x y join probability density 

210 

211 Methods 

212 ------- 

213 marginalize: 

214 marginalize the 2D joint probability distribution to obtain the 

215 discrete probability density for x and y. Returns the probabilities as 

216 as a DiscretePDF2Dplus1D object 

217 level_from_confidence: 

218 return the level to use for plt.contour for a given confidence. 

219 minimum_encompassing_contour_level: 

220 return the minimum contour level that encompasses a specific point 

221 

222 Examples 

223 -------- 

224 >>> from pesummary.utils.pdf import DiscretePDF2D 

225 >>> x = [1., 2., 3.] 

226 >>> y = [0.1, 0.2, 0.3] 

227 >>> probs = [ 

228 ... [1./9, 1./9, 1./9], 

229 ... [1./9, 1./9, 1./9], 

230 ... [1./9, 1./9, 1./9] 

231 ... ] 

232 >>> pdf = DiscretePDF2D(x, y, probs) 

233 >>> pdf.x 

234 [1.0, 2.0, 3.0] 

235 >>> pdf.y 

236 [0.1, 0.2, 0.3] 

237 >>> pdf.probs 

238 array([[0.11111111, 0.11111111, 0.11111111], 

239 [0.11111111, 0.11111111, 0.11111111], 

240 [0.11111111, 0.11111111, 0.11111111]]) 

241 """ 

242 def __init__(self, x, y, probability, **kwargs): 

243 self.x = x 

244 self.y = y 

245 self.dx = np.mean(np.diff(x)) 

246 self.dy = np.mean(np.diff(y)) 

247 self.probs = np.array(probability) 

248 if not self.probs.ndim == 2: 

249 raise ValueError("Please provide a 2d array of probabilities") 

250 if not np.isclose(np.sum(self.probs), 1.): 

251 raise ValueError("Probabilities do not sum to 1") 

252 

253 def marginalize(self): 

254 """Marginalize the 2d probability distribution and return a 

255 DiscretePDF2Dplus1D object containing the probability distribution 

256 for x, y and xy 

257 """ 

258 probs_x = np.sum(self.probs, axis=0) * self.dy 

259 probs_x /= np.sum(probs_x) 

260 probs_y = np.sum(self.probs, axis=1) * self.dx 

261 probs_y /= np.sum(probs_y) 

262 return DiscretePDF2Dplus1D( 

263 self.x, self.y, [probs_x, probs_y, self.probs] 

264 ) 

265 

266 def interpolate(self, interpolant=interp2d): 

267 """Interpolate the discrete pdf and return an InterpolatedPDF object 

268 

269 Parameters 

270 ---------- 

271 interpolant: func 

272 function to use to interpolate the discrete pdf 

273 include_bounds: Bool, optional 

274 if True, pass the upper and lower bounds to InterpolatedPDF 

275 """ 

276 return InterpolatedPDF( 

277 interpolant=interp2d(self.x, self.y, self.probs), 

278 dimensionality=2 

279 ) 

280 

281 def sort(self, descending=True): 

282 """Flatten and sort the stored probabilities 

283 

284 Parameters 

285 ---------- 

286 descending: Bool, optional 

287 if True, sort the probabilities in descending order 

288 """ 

289 _sorted = np.sort(self.probs.flatten()) 

290 if descending: 

291 return _sorted[::-1] 

292 return _sorted 

293 

294 def cdf(self, normalize=True): 

295 """Return the cumulative distribution function 

296 

297 Parameters 

298 ---------- 

299 normalize: Bool, optional 

300 if True, normalize the cumulative distribution function. Default 

301 True 

302 """ 

303 cumsum = np.cumsum(self.sort()) 

304 if normalize: 

305 cumsum /= np.sum(self.probs) 

306 return cumsum 

307 

308 def level_from_confidence(self, confidence): 

309 """Return the level to use for plt.contour for a given confidence. 

310 Confidence must be less than 1. 

311 

312 Parameters 

313 ---------- 

314 confidence: float/list 

315 float/list of confidences 

316 """ 

317 confidence = np.atleast_1d(confidence) 

318 if any(_c > 1 for _c in confidence): 

319 raise ValueError("confidence must be less than 1") 

320 _sorted = self.sort() 

321 idx = interp1d( 

322 self.cdf(), np.arange(len(_sorted)), bounds_error=False, 

323 fill_value=len(_sorted) 

324 )(confidence) 

325 level = interp1d( 

326 np.arange(len(_sorted)), _sorted, bounds_error=False, fill_value=0. 

327 )(idx) 

328 try: 

329 return sorted(level) 

330 except TypeError: 

331 return level 

332 

333 def minimum_encompassing_contour_level(self, x, y, interpolate=False): 

334 """Return the minimum encompassing contour level that encompasses a 

335 specific point 

336 

337 Parameters 

338 ---------- 

339 point: tuple 

340 the point you wish to find the minimum encompassing contour for 

341 """ 

342 _sorted = self.sort() 

343 if interpolate: 

344 _interp = self.interpolate() 

345 _idx = _interp.interpolant(x, y) 

346 else: 

347 _x = min( 

348 range(len(self.x)), key=lambda i: abs(self.x[i] - x) 

349 ) 

350 _y = min( 

351 range(len(self.y)), key=lambda i: abs(self.y[i] - y) 

352 ) 

353 _idx = [self.probs[_x, _y]] 

354 idx = interp1d( 

355 _sorted[::-1], np.arange(len(_sorted))[::-1], bounds_error=False, 

356 fill_value=len(_sorted) 

357 )(_idx) 

358 level = interp1d( 

359 np.arange(len(_sorted)), self.cdf(), bounds_error=False, fill_value=1. 

360 )(idx) 

361 return level 

362 

363 def write(self, *args, include_1d=False, **kwargs): 

364 """Write the discrete PDF to file using the pesummary.io.write module 

365 

366 Parameters 

367 ---------- 

368 *args: tuple 

369 all args passed to pesummary.io.write function 

370 include_1d: Bool, optional 

371 if True, save the 1D marginalized as well as the 2D PDF to file 

372 **kwargs: dict, optional 

373 all kwargs passed to the pesummary.io.write function 

374 """ 

375 from pesummary.io import write 

376 if not include_1d: 

377 if "dataset_name" not in kwargs.keys(): 

378 kwargs["dataset_name"] = "discrete_pdf" 

379 X, Y = np.meshgrid(self.x, self.y) 

380 write( 

381 ["x", "y", "PDF"], 

382 np.array([X.ravel(), Y.ravel(), self.probs.flatten()]).T, *args, 

383 **kwargs 

384 ) 

385 else: 

386 _pdf = self.marginalize() 

387 _pdf.write(*args, **kwargs) 

388 

389 

390class DiscretePDF2Dplus1D(object): 

391 """Class to handle 2D discrete probabilities alongside 1D discrete 

392 probability distributions. 

393 

394 Parameters 

395 ---------- 

396 args: list, optional 

397 3d list of length 3. First element integers for the x axis, second 

398 element inters for the y axis and third element, a list containing 

399 the probability distribution for x, y and the 2d join probability 

400 distribution xy. 

401 

402 Attributes 

403 ---------- 

404 x: np.ndarray 

405 array of integers for the x axis 

406 y: np.ndarray 

407 array of integers for the y axis 

408 probs: np.ndarray 

409 3D array of probabilities for the x axis, y axis and the xy joint 

410 probability density 

411 probs_x: DiscretePDF 

412 the probability density function for the x axis stored as a 

413 DiscretePDF object 

414 probs_y: DiscretePDF 

415 the probability density function for the y axis stored as a 

416 DiscretePDF object 

417 probs_xy: DiscretePDF2D 

418 the joint probability density function for the x and y axes stored as 

419 DiscretePDF2D object 

420 

421 Methods 

422 ------- 

423 write: 

424 write the discrete PDF to file using the pesummary.io.write module 

425 """ 

426 def __init__(self, x, y, probabilities, **kwargs): 

427 self.x = x 

428 self.y = y 

429 self.probs = [np.array(_p) for _p in probabilities] 

430 if len(self.probs) != 3: 

431 raise ValueError( 

432 "Please provide a tuple of length 3. Probabilities for x " 

433 "y and xy" 

434 ) 

435 if not any(_p.ndim == 2 for _p in self.probs): 

436 raise ValueError("Please provide the probabilities for xy") 

437 if not len([_p for _p in self.probs if _p.ndim == 1]) == 2: 

438 raise ValueError( 

439 "2 probabilities array must be 1 dimensional and 1 2 dimensional" 

440 ) 

441 _x = 0. 

442 for num, _p in enumerate(self.probs): 

443 if _p.ndim == 1 and _x == 0: 

444 self.probs_x = DiscretePDF(self.x, _p) 

445 self.probs[num] = self.probs_x 

446 _x = 1. 

447 elif _p.ndim == 1: 

448 self.probs_y = DiscretePDF(self.y, _p) 

449 self.probs[num] = self.probs_y 

450 else: 

451 self.probs_xy = DiscretePDF2D(self.x, self.y, _p) 

452 self.probs[num] = self.probs_xy 

453 

454 def write(self, *args, **kwargs): 

455 """Write the 1D and 2D discrete PDF to file using the pesummary.io.write 

456 module 

457 

458 Parameters 

459 ---------- 

460 *args: tuple 

461 all args passed to pesummary.io.write function 

462 **kwargs: dict, optional 

463 all kwargs passed to the pesummary.io.write function 

464 """ 

465 if "filename" in kwargs.keys() and not isinstance(kwargs["filename"], dict): 

466 raise ValueError( 

467 "Please provide filenames as a dictionary with keys '1d' and " 

468 "'2d'" 

469 ) 

470 elif "filename" in kwargs.keys(): 

471 if not all(k in ["1d", "2d"] for k in kwargs["filename"].keys()): 

472 raise ValueError("Filename must be keyed by '1d' and/or '2d'") 

473 else: 

474 _format = "dat" if "file_format" not in kwargs.keys() else kwargs[ 

475 "file_format" 

476 ] 

477 _default = "pesummary_{}_pdf.%s" % (_format) 

478 kwargs["filename"] = { 

479 "1d": _default.format("1d"), "2d": _default.format("2d") 

480 } 

481 _filenames = kwargs.pop("filename") 

482 self.probs_x.write(*args, filename=_filenames["1d"], **kwargs) 

483 self.probs_xy.write(*args, filename=_filenames["2d"], **kwargs)