Coverage for pesummary/tests/base.py: 73.3%

322 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2026-01-15 17:49 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3import os 

4import numpy as np 

5from pathlib import Path 

6from pesummary.gw.cli.inputs import WebpagePlusPlottingPlusMetaFileInput 

7from pesummary.core.cli.inputs import ( 

8 WebpagePlusPlottingPlusMetaFileInput as Input 

9) 

10 

11__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

12 

13 

14class Namespace(object): 

15 def __init__(self, **kwargs): 

16 self.__dict__.update(kwargs) 

17 

18 

19def namespace(args): 

20 """Generate a namespace for testing purposes 

21 

22 Parameters 

23 ---------- 

24 args: dict 

25 dictionary of arguments 

26 """ 

27 base = Namespace() 

28 for key in list(args.keys()): 

29 setattr(base, key, args[key]) 

30 return base 

31 

32 

33def gw_parameters(): 

34 parameters = [ 

35 'mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_jl', 

36 'phi_12', 'psi', 'theta_jn', 'ra', 'dec', 'luminosity_distance', 

37 'geocent_time', 'log_likelihood', 'mass_ratio', 'total_mass', 

38 'chirp_mass', 'symmetric_mass_ratio', 'iota', 'spin_1x', 'spin_1y', 

39 'spin_1z', 'spin_2x', 'spin_2y', 'spin_2z', 'chi_p', 'chi_eff', 

40 'cos_tilt_1', 'cos_tilt_2', 'redshift', 'comoving_distance', 

41 'mass_1_source', 'mass_2_source', 'total_mass_source', 

42 'chirp_mass_source', 'phi_1', 'phi_2', 'cos_theta_jn', 'cos_iota', 

43 'peak_luminosity_non_evolved', 'final_spin_non_evolved', 

44 'final_mass_non_evolved', 'final_mass_source_non_evolved', 

45 'radiated_energy_non_evolved', 'inverted_mass_ratio', 

46 'viewing_angle', 'chi_p_2spin', 'comoving_volume' 

47 ] 

48 return parameters 

49 

50 

51def get_list_of_files( 

52 gw=False, number=1, existing_plot=False, parameters=[], sections=[], 

53 outdir=".outdir", extra_gw_pages=True, remove_gw_pages=[] 

54): 

55 """Return a list of files that should be generated from a typical workflow 

56 """ 

57 from pesummary.conf import gw_2d_plots 

58 if not gw and not len(parameters): 

59 import string 

60 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

61 elif not len(parameters): 

62 parameters = gw_parameters() 

63 if not gw: 

64 label = "core" 

65 else: 

66 label = "gw" 

67 html = [ 

68 "%s/html/error.html" % (outdir), 

69 "%s/html/Version.html" % (outdir), 

70 "%s/html/Logging.html" % (outdir), 

71 "%s/html/About.html" % (outdir), 

72 "%s/html/Downloads.html" % (outdir)] 

73 if gw and not len(sections): 

74 sections = [ 

75 "spins", "spin_angles", "timings", "source", "remnant", "others", 

76 "masses", "location", "inclination", "energy" 

77 ] 

78 elif not len(sections): 

79 sections = ["A-D", "E-F", "I-L", "M-P", "Q-T"] 

80 for num in range(number): 

81 html.append("%s/html/%s%s_%s%s.html" % (outdir, label, num, label, num)) 

82 if gw and "classification" not in remove_gw_pages: 

83 html.append("%s/html/%s%s_%s%s_Classification.html" % (outdir, label, num, label, num)) 

84 html.append("%s/html/%s%s_%s%s_Corner.html" % (outdir, label, num, label, num)) 

85 html.append("%s/html/%s%s_%s%s_Config.html" % (outdir, label, num, label, num)) 

86 html.append("%s/html/%s%s_%s%s_Custom.html" % (outdir, label, num, label, num)) 

87 html.append("%s/html/%s%s_%s%s_All.html" % (outdir, label, num, label, num)) 

88 html.append("%s/html/%s%s_%s%s_Interactive_Corner.html" % ( 

89 outdir, label, num, label, num 

90 )) 

91 for section in sections: 

92 html.append("%s/html/%s%s_%s%s_%s_all.html" % (outdir, label, num, label, num, section)) 

93 for j in parameters: 

94 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, j)) 

95 if gw: 

96 # 2d histogram pages 

97 for section in ["location", "masses", "spins", "timings"]: 

98 if section == "location": 

99 pairs = [p for p in gw_2d_plots if "luminosity_distance" in p] 

100 if not any(all(p in parameters for p in _) for _ in pairs): 

101 continue 

102 elif section == "masses": 

103 pairs = [p for p in gw_2d_plots if "mass_1" in p] 

104 if not any(all(p in parameters for p in _) for _ in pairs): 

105 continue 

106 elif section == "spins": 

107 pairs = [p for p in gw_2d_plots if "chi_p" in p] 

108 if not any(all(p in parameters for p in _) for _ in pairs): 

109 continue 

110 elif section == "timings": 

111 pairs = [p for p in gw_2d_plots if "geocent_time" in p] 

112 if not any(all(p in parameters for p in _) for _ in pairs): 

113 continue 

114 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, section)) 

115 if existing_plot: 

116 html.append("%s/html/%s%s_%s%s_Additional.html" % (outdir, label, num, label, num)) 

117 

118 if number > 1: 

119 html.append("%s/html/Comparison.html" % (outdir)) 

120 html.append("%s/html/Comparison_Custom.html" % (outdir)) 

121 html.append("%s/html/Comparison_All.html" % (outdir)) 

122 html.append("%s/html/Comparison_Interactive_Ridgeline.html" % (outdir)) 

123 for j in parameters: 

124 if j != "classification": 

125 html.append("%s/html/Comparison_%s.html" % (outdir, j)) 

126 for section in sections: 

127 html.append("%s/html/Comparison_%s_all.html" % (outdir, section)) 

128 return sorted(html) 

129 

130 

131def get_list_of_plots( 

132 gw=False, number=1, mcmc=False, label=None, outdir=".outdir", 

133 comparison=True, psd=False, calibration=False, existing_plot=False, 

134 expert=False, waveform=False, parameters=[], extra_gw_plots=True, 

135 remove_gw_plots=[] 

136): 

137 """Return a list of plots that should be generated from a typical workflow 

138 """ 

139 from pesummary.conf import gw_2d_plots 

140 if not gw and not len(parameters): 

141 import string 

142 

143 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

144 elif not len(parameters): 

145 parameters = gw_parameters() 

146 if not gw and label is None: 

147 label = "core" 

148 elif label is None: 

149 label = "gw" 

150 

151 plots = [] 

152 for num in range(number): 

153 for i in ["sample_evolution", "autocorrelation", "1d_posterior", "cdf"]: 

154 for j in parameters: 

155 plots.append("%s/plots/%s%s_%s_%s.png" % (outdir, label, num, i, j)) 

156 if mcmc: 

157 for j in parameters: 

158 plots.append("%s/plots/%s%s_1d_posterior_%s_combined.png" % (outdir, label, num, j)) 

159 if gw: 

160 for pair in gw_2d_plots: 

161 if not all(p in parameters for p in pair): 

162 continue 

163 plots.append("%s/plots/%s%s_2d_posterior_%s_%s.png" % ( 

164 outdir, label, num, pair[0], pair[1] 

165 )) 

166 if psd: 

167 plots.append("%s/plots/%s%s_psd_plot.png" % (outdir, label, num)) 

168 if calibration: 

169 plots.append("%s/plots/%s%s_calibration_plot.png" % (outdir, label, num)) 

170 if waveform: 

171 plots.append("%s/plots/%s%s_waveform.png" % (outdir, label, num)) 

172 plots.append("%s/plots/%s%s_waveform_time_domain.png" % (outdir, label, num)) 

173 if existing_plot: 

174 plots.append("%s/plots/test.png" % (outdir)) 

175 if expert: 

176 for j in parameters: 

177 plots.append("%s/plots/%s%s_2d_contour_%s_log_likelihood.png" % (outdir, label, num, j)) 

178 plots.append("%s/plots/%s%s_1d_posterior_%s_bootstrap.png" % (outdir, label, num, j)) 

179 plots.append("%s/plots/%s%s_sample_evolution_%s_log_likelihood_colored.png" % (outdir, label, num, j)) 

180 if number > 1 and comparison: 

181 for i in ["1d_posterior", "boxplot", "cdf"]: 

182 for j in parameters: 

183 plots.append("%s/plots/combined_%s_%s.png" % (outdir, i, j)) 

184 if gw and extra_gw_plots: 

185 for num in range(number): 

186 plots.append("%s/plots/%s%s_skymap.png" % (outdir, label, num)) 

187 if "classification" not in remove_gw_plots: 

188 plots.append("%s/plots/%s%s.pesummary.em_bright.png" % (outdir, label, num)) 

189 plots.append("%s/plots/%s%s.pesummary.p_astro.png" % (outdir, label, num)) 

190 if number > 1 and comparison: 

191 plots.append("%s/plots/combined_skymap.png" % (outdir)) 

192 

193 return sorted(plots) 

194 

195 

196def make_argparse(gw=True, extension="json", bilby=False, lalinference=False, dingo=False, 

197 number=1, existing=False, disable_expert=True, outdir="./.outdir"): 

198 """ 

199 """ 

200 default_args = [] 

201 if gw: 

202 from pesummary.gw.cli.parser import ArgumentParser 

203 default_args.append("--gw") 

204 default_args.append("--nsamples_for_skymap") 

205 default_args.append("10") 

206 default_args.append("--pastro_category_file") 

207 default_args.append(testing_dir + "/rates.yml") 

208 default_args.append("--catch_terrestrial_probability_error") 

209 else: 

210 from pesummary.core.cli.parser import ArgumentParser 

211 parser = ArgumentParser() 

212 parser.add_all_known_options_to_parser() 

213 params, data = make_result_file( 

214 extension=extension, gw=gw, bilby=bilby, lalinference=lalinference, 

215 outdir=outdir 

216 ) 

217 if not existing: 

218 default_args.append("--webdir") 

219 else: 

220 default_args.append("--existing_webdir") 

221 default_args.append(outdir) 

222 default_args.append("--samples") 

223 for i in range(number): 

224 default_args.append("%s/test.%s" % (outdir, extension)) 

225 default_args.append("--labels") 

226 if not existing: 

227 for i in range(number): 

228 if not gw: 

229 default_args.append("core%s" % (i)) 

230 else: 

231 default_args.append("gw%s" % (i)) 

232 else: 

233 if not gw: 

234 default_args.append("core1") 

235 else: 

236 default_args.append("gw1") 

237 default_args.append("--config") 

238 for i in range(number): 

239 default_args.append(testing_dir + "/example_config.ini") 

240 if disable_expert: 

241 default_args.append("--disable_expert") 

242 opts = parser.parse_args(default_args) 

243 if gw: 

244 func = WebpagePlusPlottingPlusMetaFileInput 

245 else: 

246 func = Input 

247 return opts, func(opts) 

248 

249 

250def read_result_file(outdir="./.outdir", extension="json", bilby=False, 

251 lalinference=False, pesummary=False): 

252 """ 

253 """ 

254 if bilby: 

255 from bilby.core.result import read_in_result 

256 

257 if extension == "json": 

258 f = read_in_result(outdir + "/test.json") 

259 elif extension == "h5" or extension == "hdf5": 

260 f = read_in_result(outdir + "/test.hdf5") 

261 posterior = f.posterior 

262 posterior = posterior.select_dtypes(include=[float, int]) 

263 samples = {key: val for key, val in posterior.items()} 

264 elif lalinference: 

265 import h5py 

266 

267 f = h5py.File(outdir + "/test.hdf5", "r") 

268 

269 posterior = f["lalinference"]["lalinference_nest"]["posterior_samples"] 

270 samples = { 

271 i: [j[num] for j in posterior] for num, i in enumerate( 

272 posterior.dtype.names 

273 ) 

274 } 

275 elif pesummary: 

276 from pesummary.gw.file.read import read 

277 

278 if extension == "json": 

279 f = read(outdir + "/test.json") 

280 else: 

281 f = read(outdir + "/test.h5") 

282 data = f.samples_dict 

283 labels = f.labels 

284 samples = data[labels[0]] 

285 elif extension == "dat": 

286 import numpy as np 

287 

288 data = np.genfromtxt(outdir + "/test.dat", names=True) 

289 samples = { 

290 i: [j[num] for j in data] for num, i in enumerate(data.dtype.names) 

291 } 

292 else: 

293 samples = {} 

294 return samples 

295 

296 

297def make_psd(outdir="./.outdir"): 

298 """Make a psd file 

299 """ 

300 frequencies = np.linspace(0, 1024, 1000) 

301 strains = np.random.uniform(10, 0.1, 1000) 

302 data = np.vstack([frequencies, strains]).T 

303 np.savetxt( 

304 "{}/psd.dat".format(outdir), data, delimiter="\t", 

305 header="\t".join(["frequencies", "strain"]) 

306 ) 

307 

308 

309def make_calibration(outdir="./.outdir"): 

310 """Make a calibration file 

311 """ 

312 frequencies = np.linspace(0, 1024, 1000) 

313 columns = [np.random.uniform(10, 0.1, 1000) for _ in range(6)] 

314 data = np.vstack([frequencies] + columns).T 

315 np.savetxt( 

316 "{}/calibration.dat".format(outdir), data, delimiter="\t", 

317 header="\t".join(["frequencies", "a", "b", "c", "d", "e", "f"]) 

318 ) 

319 

320 

321def make_injection_file( 

322 outdir="./.outdir", extension="json", return_filename=True, 

323 return_injection_dict=True 

324): 

325 """ 

326 """ 

327 from pesummary.io import write 

328 

329 filename = os.path.join(outdir, "injection.{}".format(extension)) 

330 parameters = gw_parameters() 

331 samples = np.array([[np.random.random()] for i in range(len(parameters))]).T 

332 write(parameters, samples, filename=filename, file_format=extension) 

333 args = [] 

334 if return_filename: 

335 args.append(filename) 

336 if return_injection_dict: 

337 args.append({param: samples[0][num] for num, param in enumerate(parameters)}) 

338 return args 

339 

340 

341def make_result_file(outdir="./.outdir/", extension="json", gw=True, bilby=False, dingo=False, 

342 lalinference=False, pesummary=False, pesummary_label="label", 

343 config=None, psd=None, calibration=None, random_seed=None, 

344 n_samples=1000): 

345 """Make a result file that can be read in by PESummary 

346 

347 Parameters 

348 ---------- 

349 outdir: str 

350 directory where you would like to store the result file 

351 extension: str 

352 the file extension of the result file 

353 gw: Bool 

354 if True, gw parameters will be used 

355 """ 

356 if random_seed is not None: 

357 np.random.seed(random_seed) 

358 if outdir[-1] != "/": 

359 outdir += "/" 

360 data = np.array([np.random.random(18) for i in range(n_samples)]) 

361 if gw: 

362 parameters = ["mass_1", "mass_2", "a_1", "a_2", "tilt_1", "tilt_2", 

363 "phi_jl", "phi_12", "psi", "theta_jn", "ra", "dec", 

364 "luminosity_distance", "geocent_time", "redshift", 

365 "mass_1_source", "mass_2_source", "log_likelihood"] 

366 distance = np.random.random(n_samples) * 500 

367 mass_1 = np.random.random(n_samples) * 100 

368 q = np.random.random(n_samples) * 100 

369 a_1 = np.random.uniform(0, 0.99, n_samples) 

370 a_2 = np.random.uniform(0, 0.99, n_samples) 

371 for num, i in enumerate(data): 

372 data[num][12] = distance[num] 

373 data[num][0] = mass_1[num] 

374 data[num][1] = mass_1[num] * q[num] 

375 data[num][2] = a_1[num] 

376 data[num][3] = a_2[num] 

377 data[num][15] = mass_1[num] / (1 + data[num][14]) 

378 data[num][16] = (mass_1[num] * q[num]) / (1 + data[num][15]) 

379 else: 

380 import string 

381 

382 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

383 

384 if extension == "dat": 

385 np.savetxt( 

386 outdir + "test.dat", 

387 data, 

388 delimiter=" ", 

389 header=" ".join(parameters), 

390 comments="", 

391 ) 

392 elif extension == "csv": 

393 np.savetxt( 

394 outdir + "test.csv", 

395 data, 

396 delimiter=",", 

397 header=",".join(parameters), 

398 comments="", 

399 ) 

400 elif extension == "npy": 

401 from pesummary.utils.samples_dict import SamplesDict 

402 

403 samples = SamplesDict(parameters, np.array(data).T).to_structured_array() 

404 np.save(outdir + "test.npy", samples) 

405 elif ( 

406 extension == "json" 

407 and not bilby 

408 and not pesummary 

409 and not lalinference 

410 and not dingo 

411 ): 

412 import json 

413 

414 dictionary = { 

415 "NameOfCode": { 

416 "posterior_samples": { 

417 key: [i[num] for i in data] for num, key in enumerate(parameters) 

418 } 

419 } 

420 } 

421 with open(outdir + "test.json", "w") as f: 

422 json.dump(dictionary, f, indent=4, sort_keys=True) 

423 elif ( 

424 (extension == "hdf5" or extension == "h5") 

425 and not bilby 

426 and not pesummary 

427 and not lalinference 

428 and not dingo 

429 ): 

430 import h5py 

431 

432 h5py_data = np.array( 

433 [tuple(i) for i in data], dtype=[tuple([i, "float64"]) for i in parameters] 

434 ) 

435 f = h5py.File(outdir + "test.h5", "w") 

436 name = f.create_group("NameOfCode") 

437 samples = f.create_dataset("posterior_samples", data=h5py_data) 

438 f.close() 

439 elif bilby and not pesummary and not lalinference and not dingo: 

440 import bilby 

441 from bilby.core.result import Result 

442 from bilby.core.prior import PriorDict 

443 from pandas import DataFrame 

444 

445 priors = PriorDict() 

446 priors.update({"%s" % (i): bilby.core.prior.Uniform(0.1, 0.5, 0) for i in parameters}) 

447 posterior_data_frame = DataFrame(data, columns=parameters) 

448 injection_parameters = {par: 1. for par in parameters} 

449 bilby_object = Result( 

450 search_parameter_keys=parameters, samples=data, 

451 posterior=posterior_data_frame, label="test", 

452 injection_parameters=injection_parameters, 

453 priors=priors, 

454 log_bayes_factor=0.5, log_evidence_err=0.1, log_noise_evidence=0.1, 

455 log_evidence=0.2, version=["bilby=0.5.3:"], 

456 meta_data={"likelihood": {"time_marginalization": "True"}}) 

457 if extension == "json": 

458 bilby_object.save_to_file( 

459 filename=outdir + "test.json", extension="json") 

460 elif extension == "hdf5" or extension == "h5": 

461 bilby_object.save_to_file( 

462 filename=outdir + "test.hdf5", extension="hdf5") 

463 elif dingo and not pesummary and not lalinference and not bilby: 

464 import bilby 

465 import dingo 

466 from pandas import DataFrame 

467 from dingo.gw.result import Result 

468 

469 intrinsic_parameters = [ 

470 "mass_1", 

471 "mass_2", 

472 "a_1", 

473 "a_2", 

474 "tilt_1", 

475 "tilt_2", 

476 "phi_jl", 

477 "phi_12", 

478 "log_likelihood", 

479 "redshift", 

480 "mass_1_source", 

481 "mass_2_source", 

482 ] 

483 extrinsic_parameters = [ 

484 "psi", 

485 "theta_jn", 

486 "ra", 

487 "dec", 

488 "luminosity_distance", 

489 "geocent_time", 

490 ] 

491 assert set(parameters) == set(intrinsic_parameters + extrinsic_parameters) 

492 settings = { 

493 "train_settings": { 

494 "data": { 

495 "unconditional": False, 

496 "extrinsic_prior": { 

497 "%s" % (i): "bilby.core.prior.Uniform(0.1, 0.5, 0)" 

498 for i in extrinsic_parameters 

499 }, 

500 "window": {"type": "tukey", "f_s": 4096, "T": 8.0, "roll_off": 0.4}, 

501 "ref_time": 1126259462.0, 

502 }, 

503 }, 

504 "dataset_settings": { 

505 "intrinsic_prior": { 

506 "%s" % (i): "bilby.core.prior.Uniform(0.1, 0.5, 0)" 

507 for i in intrinsic_parameters 

508 }, 

509 "domain": { 

510 "type": "FrequencyDomain", 

511 "f_min": 20.0, 

512 "f_max": 1024.0, 

513 "delta_f": 0.125, 

514 }, 

515 "waveform_generator": {"spin_conversion_phase": None, "f_ref": 20.0}, 

516 }, 

517 } 

518 

519 weights = np.random.random(n_samples) 

520 tmp_parameters = parameters + ["weights"] 

521 tmp_data = np.c_[data, weights] 

522 posterior_data_frame = DataFrame(tmp_data, columns=tmp_parameters) 

523 

524 dictionary = { 

525 "samples": posterior_data_frame, 

526 "context": None, 

527 "event_metadata": None, 

528 "importance_sampling_metadata": None, 

529 "log_evidence": None, 

530 "log_noise_evidence": None, 

531 "settings": settings, 

532 } 

533 dingo_object = Result(file_name=None, dictionary=dictionary) 

534 if extension == "hdf5" or extension == "h5": 

535 dingo_object.to_file(file_name=outdir + f"test.{extension}") 

536 

537 elif lalinference and not bilby and not pesummary and not dingo: 

538 import h5py 

539 

540 h5py_data = np.array( 

541 [tuple(i) for i in data], dtype=[tuple([i, "float64"]) for i in parameters] 

542 ) 

543 f = h5py.File(outdir + "test.hdf5", "w") 

544 lalinference = f.create_group("lalinference") 

545 nest = lalinference.create_group("lalinference_nest") 

546 samples = nest.create_dataset("posterior_samples", data=h5py_data) 

547 f.close() 

548 elif pesummary and not lalinference and not bilby and not dingo: 

549 dictionary = { 

550 pesummary_label: { 

551 "posterior_samples": { 

552 "parameter_names": parameters, 

553 "samples": [list(i) for i in data], 

554 }, 

555 "injection_data": { 

556 "injection_values": [float("nan") for i in range(len(parameters))] 

557 }, 

558 "version": ["No version information found"], 

559 "meta_data": {"sampler": {"log_evidence": 0.5}, "meta_data": {}}, 

560 }, 

561 "version": {"pesummary": ["v0.1.7"]}, 

562 } 

563 if config is not None: 

564 dictionary[pesummary_label]["config_file"] = config 

565 if psd is not None: 

566 dictionary[pesummary_label]["psds"] = psds 

567 if calibration is not None: 

568 dictionary[pesummary_label]["calibration_envelope"] = calibration 

569 

570 if extension == "json": 

571 import json 

572 

573 with open(outdir + "test.json", "w") as f: 

574 json.dump(dictionary, f, indent=4, sort_keys=True) 

575 elif extension == "hdf5" or extension == "h5": 

576 import h5py 

577 from pesummary.core.file.meta_file import recursively_save_dictionary_to_hdf5_file 

578 

579 f = h5py.File(outdir + "test.h5", "w") 

580 recursively_save_dictionary_to_hdf5_file( 

581 f, dictionary, extra_keys=list(dictionary.keys()) 

582 ) 

583 f.close() 

584 return parameters, data 

585 

586 

587testing_dir = str(Path(__file__).parent) 

588data_dir = str(Path(__file__).parent / "files")