Coverage for pesummary/gw/file/formats/base_read.py: 80.9%

283 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-02 08:42 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3import numpy as np 

4from pesummary.gw.file.standard_names import standard_names 

5from pesummary.core.file.formats.base_read import ( 

6 Read, SingleAnalysisRead, MultiAnalysisRead 

7) 

8from pesummary.utils.utils import logger 

9from pesummary.utils.samples_dict import SamplesDict 

10from pesummary.utils.decorators import open_config 

11from pesummary.gw.conversions import convert 

12 

13__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

14 

15 

16def _translate_parameters(parameters, samples): 

17 """Translate parameters to a standard names 

18 

19 Parameters 

20 ---------- 

21 parameters: list 

22 list of parameters used in the analysis 

23 samples: list 

24 list of samples for each parameters 

25 """ 

26 path = ("https://git.ligo.org/lscsoft/pesummary/blob/master/pesummary/" 

27 "gw/file/standard_names.py") 

28 parameters_not_included = [ 

29 i for i in parameters if i not in standard_names.keys() 

30 ] 

31 if len(parameters_not_included) > 0: 

32 logger.debug( 

33 "PESummary does not have a 'standard name' for the following " 

34 "parameters: {}. This means that comparison plots between " 

35 "different codes may not show these parameters. If you want to " 

36 "assign a standard name for these parameters, please add an MR " 

37 "which edits the following file: {}. These parameters will be " 

38 "added to the result pages and meta file as is.".format( 

39 ", ".join(parameters_not_included), path 

40 ) 

41 ) 

42 standard_params = [i for i in parameters if i in standard_names.keys()] 

43 converted_params = [ 

44 standard_names[i] if i in standard_params else i for i in 

45 parameters 

46 ] 

47 return converted_params, samples 

48 

49 

50def _add_log_likelihood(parameters, samples): 

51 """Add zero log_likelihood samples to the posterior table 

52 

53 Parameters 

54 ---------- 

55 parameters: list 

56 list of parameters stored in the table 

57 samples: 2d list 

58 list of samples for each parameter. Columns correspond to a given 

59 parameter 

60 """ 

61 if "log_likelihood" not in parameters: 

62 parameters.append("log_likelihood") 

63 samples = np.vstack( 

64 [np.array(samples).T, np.zeros(len(samples))] 

65 ).T 

66 return parameters, samples 

67 

68 

69class GWRead(Read): 

70 """Base class to read in a results file 

71 

72 Parameters 

73 ---------- 

74 path_to_results_file: str 

75 path to the results file you wish to load 

76 remove_nan_likelihood_samples: Bool, optional 

77 if True, remove samples which have log_likelihood='nan'. Default True 

78 

79 Attributes 

80 ---------- 

81 parameters: list 

82 list of parameters stored in the result file 

83 converted_parameters: list 

84 list of parameters that have been derived from the sampled distributions 

85 samples: 2d list 

86 list of samples stored in the result file 

87 samples_dict: dict 

88 dictionary of samples stored in the result file keyed by parameters 

89 input_version: str 

90 version of the result file passed. 

91 extra_kwargs: dict 

92 dictionary of kwargs that were extracted from the result file 

93 converted_parameters: list 

94 list of parameters that have been added 

95 

96 Methods 

97 ------- 

98 to_dat: 

99 save the posterior samples to a .dat file 

100 to_latex_table: 

101 convert the posterior samples to a latex table 

102 generate_latex_macros: 

103 generate a set of latex macros for the stored posterior samples 

104 to_lalinference: 

105 convert the posterior samples to a lalinference result file 

106 generate_all_posterior_samples: 

107 generate all posterior distributions that may be derived from 

108 sampled distributions 

109 """ 

110 def __init__(self, path_to_results_file, **kwargs): 

111 super(GWRead, self).__init__(path_to_results_file, **kwargs) 

112 

113 @property 

114 def calibration_spline_posterior(self): 

115 return None 

116 

117 Read.attrs.update({"approximant": "approximant"}) 

118 

119 def load(self, function, _data=None, **kwargs): 

120 """Load a results file according to a given function 

121 

122 Parameters 

123 ---------- 

124 function: func 

125 callable function that will load in your results file 

126 """ 

127 data = _data 

128 if _data is None: 

129 data = self.load_from_function( 

130 function, self.path_to_results_file, **kwargs 

131 ) 

132 parameters, samples = self.translate_parameters( 

133 data["parameters"], data["samples"] 

134 ) 

135 _add_likelihood = kwargs.get("add_zero_likelihood", True) 

136 if not self.check_for_log_likelihood(parameters) and _add_likelihood: 

137 logger.warning( 

138 "Failed to find 'log_likelihood' in result file. Setting " 

139 "every sample to have log_likelihood 0" 

140 ) 

141 parameters, samples = self.add_log_likelihood(parameters, samples) 

142 data.update( 

143 { 

144 "parameters": parameters, "samples": samples, 

145 "injection": data["injection"] 

146 } 

147 ) 

148 super(GWRead, self).load(function, _data=data, **kwargs) 

149 if self.injection_parameters is not None: 

150 self.injection_parameters = self.convert_injection_parameters( 

151 self.injection_parameters, extra_kwargs=self.extra_kwargs, 

152 disable_convert=kwargs.get("disable_injection_conversion", False) 

153 ) 

154 if self.priors is not None and len(self.priors): 

155 if self.priors["samples"] != {}: 

156 priors = self.priors["samples"] 

157 self.priors["samples"] = self.convert_and_translate_prior_samples( 

158 priors, disable_convert=kwargs.get( 

159 "disable_prior_conversion", False 

160 ) 

161 ) 

162 

163 def convert_and_translate_prior_samples(self, priors, disable_convert=False): 

164 """ 

165 """ 

166 default_parameters = list(priors.keys()) 

167 default_samples = [ 

168 [priors[parameter][i] for parameter in default_parameters] for i 

169 in range(len(priors[default_parameters[0]])) 

170 ] 

171 parameters, samples = self.translate_parameters( 

172 default_parameters, default_samples 

173 ) 

174 if not disable_convert: 

175 return convert( 

176 parameters, samples, extra_kwargs=self.extra_kwargs 

177 ) 

178 return SamplesDict(parameters, samples) 

179 

180 def convert_injection_parameters( 

181 self, data, extra_kwargs={"sampler": {}, "meta_data": {}}, 

182 disable_convert=False, sampled_parameters=None, **kwargs 

183 ): 

184 """Apply the conversion module to the injection data 

185 

186 Parameters 

187 ---------- 

188 data: dict 

189 dictionary of injection data keyed by the parameter 

190 extra_kwargs: dict, optional 

191 optional kwargs to pass to the conversion module 

192 disable_convert: Bool, optional 

193 if True, do not convert injection parameters 

194 """ 

195 import math 

196 from pesummary.gw.file.injection import GWInjection 

197 kwargs.update({"extra_kwargs": extra_kwargs}) 

198 _data = data.copy() 

199 for key, item in data.items(): 

200 if math.isnan(np.atleast_1d(item)[0]): 

201 _ = _data.pop(key) 

202 if len(_data): 

203 converted = GWInjection( 

204 _data, conversion=not disable_convert, conversion_kwargs=kwargs 

205 ) 

206 _param = list(converted.keys())[0] 

207 _example = converted[_param] 

208 if not len(_example.shape): 

209 for key, item in converted.items(): 

210 converted[key] = [item] 

211 else: 

212 converted = _data 

213 for i in sampled_parameters: 

214 if i not in list(converted.keys()): 

215 converted[i] = float('nan') 

216 return converted 

217 

218 def write(self, package="core", **kwargs): 

219 """Save the data to file 

220 

221 Parameters 

222 ---------- 

223 package: str, optional 

224 package you wish to use when writing the data 

225 kwargs: dict, optional 

226 all additional kwargs are passed to the pesummary.io.write function 

227 """ 

228 return super(GWRead, self).write(package="gw", **kwargs) 

229 

230 def _grab_injection_parameters_from_file(self, path, **kwargs): 

231 """Extract data from an injection file 

232 

233 Parameters 

234 ---------- 

235 path: str 

236 path to injection file 

237 """ 

238 from pesummary.gw.file.injection import GWInjection 

239 return super(GWRead, self)._grab_injection_parameters_from_file( 

240 path, cls=GWInjection, **kwargs 

241 ) 

242 

243 def interpolate_calibration_spline_posterior(self, **kwargs): 

244 from pesummary.gw.file.calibration import Calibration 

245 from pesummary.utils.utils import iterator 

246 if self.calibration_spline_posterior is None: 

247 return 

248 total = [] 

249 log_frequencies, amplitudes, phases = self.calibration_spline_posterior 

250 keys = list(log_frequencies.keys()) 

251 _iterator = iterator( 

252 None, desc="Interpolating calibration posterior", logger=logger, 

253 tqdm=True, total=len(self.samples) * 2 * len(keys) 

254 ) 

255 with _iterator as pbar: 

256 for key in keys: 

257 total.append( 

258 Calibration.from_spline_posterior_samples( 

259 np.array(log_frequencies[key]), 

260 np.array(amplitudes[key]), np.array(phases[key]), 

261 pbar=pbar, **kwargs 

262 ) 

263 ) 

264 return total, log_frequencies.keys() 

265 

266 @staticmethod 

267 def translate_parameters(parameters, samples): 

268 """Translate parameters to a standard names 

269 

270 Parameters 

271 ---------- 

272 parameters: list 

273 list of parameters used in the analysis 

274 samples: list 

275 list of samples for each parameters 

276 """ 

277 return _translate_parameters(parameters, samples) 

278 

279 @staticmethod 

280 def _check_definition_of_inclination(parameters): 

281 """Check the definition of inclination given the other parameters 

282 

283 Parameters 

284 ---------- 

285 parameters: list 

286 list of parameters used in the study 

287 """ 

288 theta_jn = False 

289 spin_angles = ["tilt_1", "tilt_2", "a_1", "a_2"] 

290 names = [ 

291 standard_names[i] for i in parameters if i in standard_names.keys()] 

292 if all(i in names for i in spin_angles): 

293 theta_jn = True 

294 if theta_jn: 

295 if "theta_jn" not in names and "inclination" in parameters: 

296 logger.warning("Because the spin angles are in your list of " 

297 "parameters, the angle 'inclination' probably " 

298 "refers to 'theta_jn'. If this is a mistake, " 

299 "please change the definition of 'inclination' to " 

300 "'iota' in your results file") 

301 index = parameters.index("inclination") 

302 parameters[index] = "theta_jn" 

303 else: 

304 if "inclination" in parameters: 

305 index = parameters.index("inclination") 

306 parameters[index] = "iota" 

307 return parameters 

308 

309 def add_fixed_parameters_from_config_file(self, config_file): 

310 """Search the conifiguration file and add fixed parameters to the 

311 list of parameters and samples 

312 

313 Parameters 

314 ---------- 

315 config_file: str 

316 path to the configuration file 

317 """ 

318 self._add_fixed_parameters_from_config_file( 

319 config_file, self._add_fixed_parameters) 

320 

321 @staticmethod 

322 @open_config(index=2) 

323 def _add_fixed_parameters(parameters, samples, config_file): 

324 """Open a LALInference configuration file and add the fixed parameters 

325 to the list of parameters and samples 

326 

327 Parameters 

328 ---------- 

329 parameters: list 

330 list of existing parameters 

331 samples: list 

332 list of existing samples 

333 config_file: str 

334 path to the configuration file 

335 """ 

336 from pesummary.gw.file.standard_names import standard_names 

337 

338 config = config_file 

339 if not config.error: 

340 fixed_data = {} 

341 if "engine" in config.sections(): 

342 fixed_data = { 

343 key.split("fix-")[1]: item for key, item in 

344 config.items("engine") if "fix" in key} 

345 for i in fixed_data.keys(): 

346 fixed_parameter = i 

347 fixed_value = fixed_data[i] 

348 try: 

349 param = standard_names[fixed_parameter] 

350 if param in parameters: 

351 pass 

352 else: 

353 parameters.append(param) 

354 for num in range(len(samples)): 

355 samples[num].append(float(fixed_value)) 

356 except Exception: 

357 if fixed_parameter == "logdistance": 

358 if "luminosity_distance" not in parameters: 

359 parameters.append(standard_names["distance"]) 

360 for num in range(len(samples)): 

361 samples[num].append(float(fixed_value)) 

362 if fixed_parameter == "costheta_jn": 

363 if "theta_jn" not in parameters: 

364 parameters.append(standard_names["theta_jn"]) 

365 for num in range(len(samples)): 

366 samples[num].append(float(fixed_value)) 

367 return parameters, samples 

368 return parameters, samples 

369 

370 

371class GWSingleAnalysisRead(GWRead, SingleAnalysisRead): 

372 """Base class to read in a results file which contains a single analysis 

373 

374 Parameters 

375 ---------- 

376 path_to_results_file: str 

377 path to the results file you wish to load 

378 remove_nan_likelihood_samples: Bool, optional 

379 if True, remove samples which have log_likelihood='nan'. Default True 

380 

381 Attributes 

382 ---------- 

383 parameters: list 

384 list of parameters stored in the result file 

385 converted_parameters: list 

386 list of parameters that have been derived from the sampled distributions 

387 samples: 2d list 

388 list of samples stored in the result file 

389 samples_dict: dict 

390 dictionary of samples stored in the result file keyed by parameters 

391 input_version: str 

392 version of the result file passed. 

393 extra_kwargs: dict 

394 dictionary of kwargs that were extracted from the result file 

395 converted_parameters: list 

396 list of parameters that have been added 

397 

398 Methods 

399 ------- 

400 to_dat: 

401 save the posterior samples to a .dat file 

402 to_latex_table: 

403 convert the posterior samples to a latex table 

404 generate_latex_macros: 

405 generate a set of latex macros for the stored posterior samples 

406 to_lalinference: 

407 convert the posterior samples to a lalinference result file 

408 generate_all_posterior_samples: 

409 generate all posterior distributions that may be derived from 

410 sampled distributions 

411 """ 

412 def __init__(self, *args, **kwargs): 

413 super(GWSingleAnalysisRead, self).__init__(*args, **kwargs) 

414 

415 def check_for_log_likelihood(self, parameters): 

416 """Return True if 'log_likelihood' is in a list of sampled parameters 

417 

418 Parameters 

419 ---------- 

420 parameters: list 

421 list of sampled parameters 

422 """ 

423 if "log_likelihood" in parameters: 

424 return True 

425 return False 

426 

427 def add_log_likelihood(self, parameters, samples): 

428 """Add log_likelihood samples to a posterior table 

429 

430 Parameters 

431 ---------- 

432 parameters: list 

433 list of parameters stored in the table 

434 samples: 2d list 

435 list of samples for each parameter. Columns correspond to a given 

436 parameter 

437 """ 

438 return _add_log_likelihood(parameters, samples) 

439 

440 def generate_all_posterior_samples(self, **kwargs): 

441 """Generate all posterior samples via the conversion module 

442 

443 Parameters 

444 ---------- 

445 **kwargs: dict 

446 all kwargs passed to the conversion module 

447 """ 

448 if "no_conversion" in kwargs.keys(): 

449 no_conversion = kwargs.pop("no_conversion") 

450 else: 

451 no_conversion = False 

452 if not no_conversion: 

453 from pesummary.gw.conversions import convert 

454 

455 data = convert( 

456 self.parameters, self.samples, extra_kwargs=self.extra_kwargs, 

457 return_dict=False, **kwargs 

458 ) 

459 self.parameters = data[0] 

460 self.converted_parameters = self.parameters.added 

461 self.samples = data[1] 

462 if kwargs.get("return_kwargs", False): 

463 self.extra_kwargs = data[2] 

464 

465 def convert_injection_parameters(self, *args, **kwargs): 

466 return super(GWSingleAnalysisRead, self).convert_injection_parameters( 

467 *args, sampled_parameters=self.parameters, **kwargs 

468 ) 

469 

470 def to_lalinference(self, **kwargs): 

471 """Save the PESummary results file object to a lalinference hdf5 file 

472 

473 Parameters 

474 ---------- 

475 kwargs: dict 

476 all kwargs are passed to the pesummary.io.write.write function 

477 """ 

478 return self.write(file_format="lalinference", package="gw", **kwargs) 

479 

480 

481class GWMultiAnalysisRead(GWRead, MultiAnalysisRead): 

482 """Base class to read in a results file which contains multiple analyses 

483 

484 Parameters 

485 ---------- 

486 path_to_results_file: str 

487 path to the results file you wish to load 

488 remove_nan_likelihood_samples: Bool, optional 

489 if True, remove samples which have log_likelihood='nan'. Default True 

490 """ 

491 def __init__(self, *args, **kwargs): 

492 super(GWMultiAnalysisRead, self).__init__(*args, **kwargs) 

493 

494 def load(self, *args, **kwargs): 

495 super(GWMultiAnalysisRead, self).load(*args, **kwargs) 

496 if "psd" in self.data.keys(): 

497 from pesummary.gw.file.psd import PSDDict 

498 

499 try: 

500 self.psd = { 

501 label: PSDDict( 

502 {ifo: value for ifo, value in psd_data.items()} 

503 ) for label, psd_data in self.data["psd"].items() 

504 } 

505 except (KeyError, AttributeError): 

506 self.psd = self.data["psd"] 

507 if "calibration" in self.data.keys(): 

508 from pesummary.gw.file.calibration import Calibration 

509 

510 try: 

511 self.calibration = { 

512 label: { 

513 ifo: Calibration(value) for ifo, value in 

514 calibration_data.items() 

515 } for label, calibration_data in 

516 self.data["calibration"].items() 

517 } 

518 except (KeyError, AttributeError): 

519 self.calibration = self.data["calibration"] 

520 if "prior" in self.data.keys() and "calibration" in self.data["prior"].keys(): 

521 from pesummary.gw.file.calibration import CalibrationDict 

522 

523 try: 

524 self.priors["calibration"] = { 

525 label: CalibrationDict(calibration_data) for 

526 label, calibration_data in 

527 self.data["prior"]["calibration"].items() 

528 } 

529 except (KeyError, AttributeError): 

530 pass 

531 if "skymap" in self.data.keys(): 

532 from pesummary.gw.file.skymap import SkyMapDict, SkyMap 

533 

534 try: 

535 self.skymap = SkyMapDict({ 

536 label: SkyMap(skymap["data"], skymap["meta_data"]) 

537 for label, skymap in self.data["skymap"].items() 

538 }) 

539 except (KeyError, AttributeError): 

540 self.skymap = self.data["skymap"] 

541 if "gwdata" in self.data.keys(): 

542 try: 

543 from pesummary.gw.file.strain import StrainDataDict, StrainData 

544 mydict = {} 

545 for IFO, value in self.data["gwdata"].items(): 

546 channel = [ch for ch in value.keys() if "_attrs" not in ch][0] 

547 if "{}_attrs".format(channel) in value.keys(): 

548 _attrs = value["{}_attrs".format(channel)] 

549 else: 

550 _attrs = {} 

551 mydict[IFO] = StrainData(value[channel], **_attrs) 

552 self.gwdata = StrainDataDict(mydict) 

553 except (KeyError, AttributeError): 

554 pass 

555 

556 def convert_and_translate_prior_samples(self, priors, disable_convert=False): 

557 """ 

558 """ 

559 from pesummary.utils.samples_dict import MultiAnalysisSamplesDict 

560 

561 mydict = {} 

562 for num, label in enumerate(self.labels): 

563 if label in priors.keys() and len(priors[label]): 

564 default_parameters = list(priors[label].keys()) 

565 default_samples = np.array( 

566 [priors[label][_param] for _param in default_parameters] 

567 ).T 

568 parameters, samples = self.translate_parameters( 

569 [default_parameters], [default_samples] 

570 ) 

571 if not disable_convert: 

572 mydict[label] = convert( 

573 parameters[0], samples[0], extra_kwargs=self.extra_kwargs[num] 

574 ) 

575 else: 

576 mydict[label] = SamplesDict(parameters[0], samples[0]) 

577 else: 

578 mydict[label] = {} 

579 return MultiAnalysisSamplesDict(mydict) 

580 

581 def check_for_log_likelihood(self, parameters): 

582 if all("log_likelihood" in p for p in parameters): 

583 return True 

584 return False 

585 

586 @staticmethod 

587 def translate_parameters(parameters, samples): 

588 """Translate parameters to a standard names 

589 

590 Parameters 

591 ---------- 

592 parameters: list 

593 list of parameters used in the analysis 

594 samples: list 

595 list of samples for each parameters 

596 """ 

597 converted_params = [] 

598 for _parameters, _samples in zip(parameters, samples): 

599 converted_params.append( 

600 _translate_parameters(_parameters, _samples)[0] 

601 ) 

602 return converted_params, samples 

603 

604 def add_log_likelihood(self, parameters, samples): 

605 """ 

606 """ 

607 parameters_logl, samples_logl = [], [] 

608 for _parameters, _samples in zip(parameters, samples): 

609 pp, ss = _add_log_likelihood(_parameters, _samples) 

610 parameters_logl.append(pp) 

611 samples_logl.append(ss) 

612 return parameters_logl, samples_logl 

613 

614 def generate_all_posterior_samples(self, labels=None, **conversion_kwargs): 

615 if "no_conversion" in conversion_kwargs.keys(): 

616 no_conversion = conversion_kwargs.pop("no_conversion") 

617 else: 

618 no_conversion = False 

619 if no_conversion: 

620 return 

621 from pesummary.gw.conversions import convert 

622 

623 converted_params, converted_samples, converted_kwargs = [], [], [] 

624 _converted_params = [] 

625 for label, param, samples, kwargs in zip( 

626 self.labels, self.parameters, self.samples, self.extra_kwargs 

627 ): 

628 if labels is not None and label not in labels: 

629 converted_params.append(param) 

630 _converted_params.append([]) 

631 converted_samples.append(samples) 

632 if kwargs.get("return_kwargs", False): 

633 converted_kwargs.append(kwargs) 

634 continue 

635 if label in conversion_kwargs.keys(): 

636 _conversion_kwargs = conversion_kwargs[label] 

637 else: 

638 _conversion_kwargs = conversion_kwargs 

639 if _conversion_kwargs.get("evolve_spins", False): 

640 if not _conversion_kwargs.get("return_kwargs", False): 

641 _conversion_kwargs["return_kwargs"] = True 

642 data = convert( 

643 param, samples, extra_kwargs=kwargs, return_dict=False, 

644 **_conversion_kwargs 

645 ) 

646 converted_params.append(data[0]) 

647 _converted_params.append(data[0].added) 

648 converted_samples.append(data[1]) 

649 if kwargs.get("return_kwargs", False): 

650 converted_kwargs.append(data[2]) 

651 self.parameters = converted_params 

652 self.converted_parameters = _converted_params 

653 self.samples = converted_samples 

654 if converted_kwargs != []: 

655 self.extra_kwargs = { 

656 label: converted_kwargs[num] for num, label in enumerate( 

657 self.labels 

658 ) 

659 } 

660 

661 def convert_injection_parameters( 

662 self, data, extra_kwargs={"sampler": {}, "meta_data": {}}, 

663 disable_convert=False 

664 ): 

665 """Apply the conversion module to the injection data 

666 """ 

667 for num, label in enumerate(self.labels): 

668 _identifier = label 

669 if isinstance(data, dict): 

670 _data = data[label] 

671 else: 

672 _data = data[num] 

673 _identifier = num 

674 data[_identifier] = super( 

675 GWMultiAnalysisRead, self 

676 ).convert_injection_parameters( 

677 _data, extra_kwargs=extra_kwargs[num], 

678 disable_convert=disable_convert, 

679 sampled_parameters=self.parameters[num] 

680 ) 

681 return data