Coverage for pesummary/tests/base.py: 71.9%

306 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-11-05 13:38 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3import os 

4import numpy as np 

5from pathlib import Path 

6from pesummary.gw.cli.inputs import WebpagePlusPlottingPlusMetaFileInput 

7from pesummary.core.cli.inputs import ( 

8 WebpagePlusPlottingPlusMetaFileInput as Input 

9) 

10 

11__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

12 

13 

14class Namespace(object): 

15 def __init__(self, **kwargs): 

16 self.__dict__.update(kwargs) 

17 

18 

19def namespace(args): 

20 """Generate a namespace for testing purposes 

21 

22 Parameters 

23 ---------- 

24 args: dict 

25 dictionary of arguments 

26 """ 

27 base = Namespace() 

28 for key in list(args.keys()): 

29 setattr(base, key, args[key]) 

30 return base 

31 

32 

33def gw_parameters(): 

34 parameters = [ 

35 'mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_jl', 

36 'phi_12', 'psi', 'theta_jn', 'ra', 'dec', 'luminosity_distance', 

37 'geocent_time', 'log_likelihood', 'mass_ratio', 'total_mass', 

38 'chirp_mass', 'symmetric_mass_ratio', 'iota', 'spin_1x', 'spin_1y', 

39 'spin_1z', 'spin_2x', 'spin_2y', 'spin_2z', 'chi_p', 'chi_eff', 

40 'cos_tilt_1', 'cos_tilt_2', 'redshift', 'comoving_distance', 

41 'mass_1_source', 'mass_2_source', 'total_mass_source', 

42 'chirp_mass_source', 'phi_1', 'phi_2', 'cos_theta_jn', 'cos_iota', 

43 'peak_luminosity_non_evolved', 'final_spin_non_evolved', 

44 'final_mass_non_evolved', 'final_mass_source_non_evolved', 

45 'radiated_energy_non_evolved', 'inverted_mass_ratio', 

46 'viewing_angle', 'chi_p_2spin', 'comoving_volume' 

47 ] 

48 return parameters 

49 

50 

51def get_list_of_files( 

52 gw=False, number=1, existing_plot=False, parameters=[], sections=[], 

53 outdir=".outdir", extra_gw_pages=True, remove_gw_pages=[] 

54): 

55 """Return a list of files that should be generated from a typical workflow 

56 """ 

57 from pesummary.conf import gw_2d_plots 

58 if not gw and not len(parameters): 

59 import string 

60 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

61 elif not len(parameters): 

62 parameters = gw_parameters() 

63 if not gw: 

64 label = "core" 

65 else: 

66 label = "gw" 

67 html = [ 

68 "%s/html/error.html" % (outdir), 

69 "%s/html/Version.html" % (outdir), 

70 "%s/html/Logging.html" % (outdir), 

71 "%s/html/About.html" % (outdir), 

72 "%s/html/Downloads.html" % (outdir)] 

73 if gw and not len(sections): 

74 sections = [ 

75 "spins", "spin_angles", "timings", "source", "remnant", "others", 

76 "masses", "location", "inclination", "energy" 

77 ] 

78 elif not len(sections): 

79 sections = ["A-D", "E-F", "I-L", "M-P", "Q-T"] 

80 for num in range(number): 

81 html.append("%s/html/%s%s_%s%s.html" % (outdir, label, num, label, num)) 

82 if gw and "classification" not in remove_gw_pages: 

83 html.append("%s/html/%s%s_%s%s_Classification.html" % (outdir, label, num, label, num)) 

84 html.append("%s/html/%s%s_%s%s_Corner.html" % (outdir, label, num, label, num)) 

85 html.append("%s/html/%s%s_%s%s_Config.html" % (outdir, label, num, label, num)) 

86 html.append("%s/html/%s%s_%s%s_Custom.html" % (outdir, label, num, label, num)) 

87 html.append("%s/html/%s%s_%s%s_All.html" % (outdir, label, num, label, num)) 

88 html.append("%s/html/%s%s_%s%s_Interactive_Corner.html" % ( 

89 outdir, label, num, label, num 

90 )) 

91 for section in sections: 

92 html.append("%s/html/%s%s_%s%s_%s_all.html" % (outdir, label, num, label, num, section)) 

93 for j in parameters: 

94 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, j)) 

95 if gw: 

96 # 2d histogram pages 

97 for section in ["location", "masses", "spins", "timings"]: 

98 if section == "location": 

99 pairs = [p for p in gw_2d_plots if "luminosity_distance" in p] 

100 if not any(all(p in parameters for p in _) for _ in pairs): 

101 continue 

102 elif section == "masses": 

103 pairs = [p for p in gw_2d_plots if "mass_1" in p] 

104 if not any(all(p in parameters for p in _) for _ in pairs): 

105 continue 

106 elif section == "spins": 

107 pairs = [p for p in gw_2d_plots if "chi_p" in p] 

108 if not any(all(p in parameters for p in _) for _ in pairs): 

109 continue 

110 elif section == "timings": 

111 pairs = [p for p in gw_2d_plots if "geocent_time" in p] 

112 if not any(all(p in parameters for p in _) for _ in pairs): 

113 continue 

114 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, section)) 

115 if existing_plot: 

116 html.append("%s/html/%s%s_%s%s_Additional.html" % (outdir, label, num, label, num)) 

117 

118 if number > 1: 

119 html.append("%s/html/Comparison.html" % (outdir)) 

120 html.append("%s/html/Comparison_Custom.html" % (outdir)) 

121 html.append("%s/html/Comparison_All.html" % (outdir)) 

122 html.append("%s/html/Comparison_Interactive_Ridgeline.html" % (outdir)) 

123 for j in parameters: 

124 if j != "classification": 

125 html.append("%s/html/Comparison_%s.html" % (outdir, j)) 

126 for section in sections: 

127 html.append("%s/html/Comparison_%s_all.html" % (outdir, section)) 

128 return sorted(html) 

129 

130 

131def get_list_of_plots( 

132 gw=False, number=1, mcmc=False, label=None, outdir=".outdir", 

133 comparison=True, psd=False, calibration=False, existing_plot=False, 

134 expert=False, waveform=False, parameters=[], extra_gw_plots=True, 

135 remove_gw_plots=[] 

136): 

137 """Return a list of plots that should be generated from a typical workflow 

138 """ 

139 from pesummary.conf import gw_2d_plots 

140 if not gw and not len(parameters): 

141 import string 

142 

143 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

144 elif not len(parameters): 

145 parameters = gw_parameters() 

146 if not gw and label is None: 

147 label = "core" 

148 elif label is None: 

149 label = "gw" 

150 

151 plots = [] 

152 for num in range(number): 

153 for i in ["sample_evolution", "autocorrelation", "1d_posterior", "cdf"]: 

154 for j in parameters: 

155 plots.append("%s/plots/%s%s_%s_%s.png" % (outdir, label, num, i, j)) 

156 if mcmc: 

157 for j in parameters: 

158 plots.append("%s/plots/%s%s_1d_posterior_%s_combined.png" % (outdir, label, num, j)) 

159 if gw: 

160 for pair in gw_2d_plots: 

161 if not all(p in parameters for p in pair): 

162 continue 

163 plots.append("%s/plots/%s%s_2d_posterior_%s_%s.png" % ( 

164 outdir, label, num, pair[0], pair[1] 

165 )) 

166 if psd: 

167 plots.append("%s/plots/%s%s_psd_plot.png" % (outdir, label, num)) 

168 if calibration: 

169 plots.append("%s/plots/%s%s_calibration_plot.png" % (outdir, label, num)) 

170 if waveform: 

171 plots.append("%s/plots/%s%s_waveform.png" % (outdir, label, num)) 

172 plots.append("%s/plots/%s%s_waveform_time_domain.png" % (outdir, label, num)) 

173 if existing_plot: 

174 plots.append("%s/plots/test.png" % (outdir)) 

175 if expert: 

176 for j in parameters: 

177 plots.append("%s/plots/%s%s_2d_contour_%s_log_likelihood.png" % (outdir, label, num, j)) 

178 plots.append("%s/plots/%s%s_1d_posterior_%s_bootstrap.png" % (outdir, label, num, j)) 

179 plots.append("%s/plots/%s%s_sample_evolution_%s_log_likelihood_colored.png" % (outdir, label, num, j)) 

180 if number > 1 and comparison: 

181 for i in ["1d_posterior", "boxplot", "cdf"]: 

182 for j in parameters: 

183 plots.append("%s/plots/combined_%s_%s.png" % (outdir, i, j)) 

184 if gw and extra_gw_plots: 

185 for num in range(number): 

186 plots.append("%s/plots/%s%s_skymap.png" % (outdir, label, num)) 

187 if "classification" not in remove_gw_plots: 

188 plots.append("%s/plots/%s%s.pesummary.em_bright.png" % (outdir, label, num)) 

189 plots.append("%s/plots/%s%s.pesummary.p_astro.png" % (outdir, label, num)) 

190 if number > 1 and comparison: 

191 plots.append("%s/plots/combined_skymap.png" % (outdir)) 

192 

193 return sorted(plots) 

194 

195 

196def make_argparse(gw=True, extension="json", bilby=False, lalinference=False, 

197 number=1, existing=False, disable_expert=True, outdir="./.outdir"): 

198 """ 

199 """ 

200 default_args = [] 

201 if gw: 

202 from pesummary.gw.cli.parser import ArgumentParser 

203 default_args.append("--gw") 

204 default_args.append("--nsamples_for_skymap") 

205 default_args.append("10") 

206 default_args.append("--pastro_category_file") 

207 default_args.append(testing_dir + "/rates.yml") 

208 default_args.append("--catch_terrestrial_probability_error") 

209 else: 

210 from pesummary.core.cli.parser import ArgumentParser 

211 parser = ArgumentParser() 

212 parser.add_all_known_options_to_parser() 

213 params, data = make_result_file( 

214 extension=extension, gw=gw, bilby=bilby, lalinference=lalinference, 

215 outdir=outdir 

216 ) 

217 if not existing: 

218 default_args.append("--webdir") 

219 else: 

220 default_args.append("--existing_webdir") 

221 default_args.append(outdir) 

222 default_args.append("--samples") 

223 for i in range(number): 

224 default_args.append("%s/test.%s" % (outdir, extension)) 

225 default_args.append("--labels") 

226 if not existing: 

227 for i in range(number): 

228 if not gw: 

229 default_args.append("core%s" % (i)) 

230 else: 

231 default_args.append("gw%s" % (i)) 

232 else: 

233 if not gw: 

234 default_args.append("core1") 

235 else: 

236 default_args.append("gw1") 

237 default_args.append("--config") 

238 for i in range(number): 

239 default_args.append(testing_dir + "/example_config.ini") 

240 if disable_expert: 

241 default_args.append("--disable_expert") 

242 opts = parser.parse_args(default_args) 

243 if gw: 

244 func = WebpagePlusPlottingPlusMetaFileInput 

245 else: 

246 func = Input 

247 return opts, func(opts) 

248 

249 

250def read_result_file(outdir="./.outdir", extension="json", bilby=False, 

251 lalinference=False, pesummary=False): 

252 """ 

253 """ 

254 if bilby: 

255 from bilby.core.result import read_in_result 

256 

257 if extension == "json": 

258 f = read_in_result(outdir + "/test.json") 

259 elif extension == "h5" or extension == "hdf5": 

260 f = read_in_result(outdir + "/test.hdf5") 

261 posterior = f.posterior 

262 posterior = posterior.select_dtypes(include=[float, int]) 

263 samples = {key: val for key, val in posterior.items()} 

264 elif lalinference: 

265 import h5py 

266 

267 f = h5py.File(outdir + "/test.hdf5", "r") 

268 

269 posterior = f["lalinference"]["lalinference_nest"]["posterior_samples"] 

270 samples = { 

271 i: [j[num] for j in posterior] for num, i in enumerate( 

272 posterior.dtype.names 

273 ) 

274 } 

275 elif pesummary: 

276 from pesummary.gw.file.read import read 

277 

278 if extension == "json": 

279 f = read(outdir + "/test.json") 

280 else: 

281 f = read(outdir + "/test.h5") 

282 data = f.samples_dict 

283 labels = f.labels 

284 samples = data[labels[0]] 

285 elif extension == "dat": 

286 import numpy as np 

287 

288 data = np.genfromtxt(outdir + "/test.dat", names=True) 

289 samples = { 

290 i: [j[num] for j in data] for num, i in enumerate(data.dtype.names) 

291 } 

292 else: 

293 samples = {} 

294 return samples 

295 

296 

297def make_psd(outdir="./.outdir"): 

298 """Make a psd file 

299 """ 

300 frequencies = np.linspace(0, 1024, 1000) 

301 strains = np.random.uniform(10, 0.1, 1000) 

302 data = np.vstack([frequencies, strains]).T 

303 np.savetxt( 

304 "{}/psd.dat".format(outdir), data, delimiter="\t", 

305 header="\t".join(["frequencies", "strain"]) 

306 ) 

307 

308 

309def make_calibration(outdir="./.outdir"): 

310 """Make a calibration file 

311 """ 

312 frequencies = np.linspace(0, 1024, 1000) 

313 columns = [np.random.uniform(10, 0.1, 1000) for _ in range(6)] 

314 data = np.vstack([frequencies] + columns).T 

315 np.savetxt( 

316 "{}/calibration.dat".format(outdir), data, delimiter="\t", 

317 header="\t".join(["frequencies", "a", "b", "c", "d", "e", "f"]) 

318 ) 

319 

320 

321def make_injection_file( 

322 outdir="./.outdir", extension="json", return_filename=True, 

323 return_injection_dict=True 

324): 

325 """ 

326 """ 

327 from pesummary.io import write 

328 

329 filename = os.path.join(outdir, "injection.{}".format(extension)) 

330 parameters = gw_parameters() 

331 samples = np.array([[np.random.random()] for i in range(len(parameters))]).T 

332 write(parameters, samples, filename=filename, file_format=extension) 

333 args = [] 

334 if return_filename: 

335 args.append(filename) 

336 if return_injection_dict: 

337 args.append({param: samples[0][num] for num, param in enumerate(parameters)}) 

338 return args 

339 

340 

341def make_result_file(outdir="./.outdir/", extension="json", gw=True, bilby=False, 

342 lalinference=False, pesummary=False, pesummary_label="label", 

343 config=None, psd=None, calibration=None, random_seed=None, 

344 n_samples=1000): 

345 """Make a result file that can be read in by PESummary 

346 

347 Parameters 

348 ---------- 

349 outdir: str 

350 directory where you would like to store the result file 

351 extension: str 

352 the file extension of the result file 

353 gw: Bool 

354 if True, gw parameters will be used 

355 """ 

356 if random_seed is not None: 

357 np.random.seed(random_seed) 

358 print(extension, gw, bilby, lalinference, pesummary) 

359 if outdir[-1] != "/": 

360 outdir += "/" 

361 data = np.array([np.random.random(18) for i in range(n_samples)]) 

362 if gw: 

363 parameters = ["mass_1", "mass_2", "a_1", "a_2", "tilt_1", "tilt_2", 

364 "phi_jl", "phi_12", "psi", "theta_jn", "ra", "dec", 

365 "luminosity_distance", "geocent_time", "redshift", 

366 "mass_1_source", "mass_2_source", "log_likelihood"] 

367 distance = np.random.random(n_samples) * 500 

368 mass_1 = np.random.random(n_samples) * 100 

369 q = np.random.random(n_samples) * 100 

370 a_1 = np.random.uniform(0, 0.99, n_samples) 

371 a_2 = np.random.uniform(0, 0.99, n_samples) 

372 for num, i in enumerate(data): 

373 data[num][12] = distance[num] 

374 data[num][0] = mass_1[num] 

375 data[num][1] = mass_1[num] * q[num] 

376 data[num][2] = a_1[num] 

377 data[num][3] = a_2[num] 

378 data[num][15] = mass_1[num] / (1 + data[num][14]) 

379 data[num][16] = (mass_1[num] * q[num]) / (1 + data[num][15]) 

380 else: 

381 import string 

382 

383 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

384 if extension == "dat": 

385 np.savetxt(outdir + "test.dat", data, delimiter=" ", 

386 header=" ".join(parameters), comments="") 

387 elif extension == "csv": 

388 np.savetxt(outdir + "test.csv", data, delimiter=",", 

389 header=",".join(parameters), comments="") 

390 elif extension == "npy": 

391 from pesummary.utils.samples_dict import SamplesDict 

392 samples = SamplesDict(parameters, np.array(data).T).to_structured_array() 

393 np.save(outdir + "test.npy", samples) 

394 elif extension == "json" and not bilby and not pesummary and not lalinference: 

395 import json 

396 

397 dictionary = {"NameOfCode": {"posterior_samples": {key: 

398 [i[num] for i in data] for num, key in 

399 enumerate(parameters)}}} 

400 with open(outdir + "test.json", "w") as f: 

401 json.dump(dictionary, f, indent=4, sort_keys=True) 

402 elif (extension == "hdf5" or extension == "h5") and not bilby and not pesummary and not lalinference: 

403 import h5py 

404 

405 h5py_data = np.array( 

406 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters]) 

407 f = h5py.File(outdir + "test.h5", "w") 

408 name = f.create_group("NameOfCode") 

409 samples = f.create_dataset("posterior_samples", data=h5py_data) 

410 f.close() 

411 elif bilby and not pesummary and not lalinference: 

412 import bilby 

413 from bilby.core.result import Result 

414 from bilby.core.prior import PriorDict 

415 from pandas import DataFrame 

416 

417 priors = PriorDict() 

418 priors.update({"%s" % (i): bilby.core.prior.Uniform(0.1, 0.5, 0) for i in parameters}) 

419 posterior_data_frame = DataFrame(data, columns=parameters) 

420 injection_parameters = {par: 1. for par in parameters} 

421 bilby_object = Result( 

422 search_parameter_keys=parameters, samples=data, 

423 posterior=posterior_data_frame, label="test", 

424 injection_parameters=injection_parameters, 

425 priors=priors, 

426 log_bayes_factor=0.5, log_evidence_err=0.1, log_noise_evidence=0.1, 

427 log_evidence=0.2, version=["bilby=0.5.3:"], 

428 meta_data={"likelihood": {"time_marginalization": "True"}}) 

429 if extension == "json": 

430 bilby_object.save_to_file( 

431 filename=outdir + "test.json", extension="json") 

432 elif extension == "hdf5" or extension == "h5": 

433 bilby_object.save_to_file( 

434 filename=outdir + "test.hdf5", extension="hdf5") 

435 elif lalinference and not bilby and not pesummary: 

436 import h5py 

437 

438 h5py_data = np.array( 

439 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters]) 

440 f = h5py.File(outdir + "test.hdf5", "w") 

441 lalinference = f.create_group("lalinference") 

442 nest = lalinference.create_group("lalinference_nest") 

443 samples = nest.create_dataset("posterior_samples", data=h5py_data) 

444 f.close() 

445 elif pesummary and not lalinference and not bilby: 

446 dictionary = { 

447 pesummary_label: { 

448 "posterior_samples": { 

449 "parameter_names": parameters, 

450 "samples": [list(i) for i in data] 

451 }, 

452 "injection_data": { 

453 "injection_values": [float("nan") for i in range(len(parameters))] 

454 }, 

455 "version": ["No version information found"], 

456 "meta_data": { 

457 "sampler": {"log_evidence": 0.5}, 

458 "meta_data": {} 

459 } 

460 }, 

461 "version": { 

462 "pesummary": ["v0.1.7"] 

463 } 

464 } 

465 if config is not None: 

466 dictionary[pesummary_label]["config_file"] = config 

467 if psd is not None: 

468 dictionary[pesummary_label]["psds"] = psds 

469 if calibration is not None: 

470 dictionary[pesummary_label]["calibration_envelope"] = calibration 

471 

472 if extension == "json": 

473 import json 

474 

475 with open(outdir + "test.json", "w") as f: 

476 json.dump(dictionary, f, indent=4, sort_keys=True) 

477 elif extension == "hdf5" or extension == "h5": 

478 import h5py 

479 from pesummary.core.file.meta_file import recursively_save_dictionary_to_hdf5_file 

480 

481 f = h5py.File(outdir + "test.h5", "w") 

482 recursively_save_dictionary_to_hdf5_file( 

483 f, dictionary, extra_keys=list(dictionary.keys()) 

484 ) 

485 f.close() 

486 return parameters, data 

487 

488 

489testing_dir = str(Path(__file__).parent) 

490data_dir = str(Path(__file__).parent / "files")