Coverage for pesummary/tests/base.py: 87.1%

295 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-02 08:42 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3import os 

4import numpy as np 

5from pathlib import Path 

6from pesummary.gw.cli.inputs import WebpagePlusPlottingPlusMetaFileInput 

7from pesummary.core.cli.inputs import ( 

8 WebpagePlusPlottingPlusMetaFileInput as Input 

9) 

10 

11__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

12 

13 

14class Namespace(object): 

15 def __init__(self, **kwargs): 

16 self.__dict__.update(kwargs) 

17 

18 

19def namespace(args): 

20 """Generate a namespace for testing purposes 

21 

22 Parameters 

23 ---------- 

24 args: dict 

25 dictionary of arguments 

26 """ 

27 base = Namespace() 

28 for key in list(args.keys()): 

29 setattr(base, key, args[key]) 

30 return base 

31 

32 

33def gw_parameters(): 

34 parameters = [ 

35 'mass_1', 'mass_2', 'a_1', 'a_2', 'tilt_1', 'tilt_2', 'phi_jl', 

36 'phi_12', 'psi', 'theta_jn', 'ra', 'dec', 'luminosity_distance', 

37 'geocent_time', 'log_likelihood', 'mass_ratio', 'total_mass', 

38 'chirp_mass', 'symmetric_mass_ratio', 'iota', 'spin_1x', 'spin_1y', 

39 'spin_1z', 'spin_2x', 'spin_2y', 'spin_2z', 'chi_p', 'chi_eff', 

40 'cos_tilt_1', 'cos_tilt_2', 'redshift', 'comoving_distance', 

41 'mass_1_source', 'mass_2_source', 'total_mass_source', 

42 'chirp_mass_source', 'phi_1', 'phi_2', 'cos_theta_jn', 'cos_iota', 

43 'peak_luminosity_non_evolved', 'final_spin_non_evolved', 

44 'final_mass_non_evolved', 'final_mass_source_non_evolved', 

45 'radiated_energy_non_evolved', 'inverted_mass_ratio', 

46 'viewing_angle', 'chi_p_2spin' 

47 ] 

48 return parameters 

49 

50 

51def get_list_of_files( 

52 gw=False, number=1, existing_plot=False, parameters=[], sections=[], 

53 outdir=".outdir", extra_gw_pages=True 

54): 

55 """Return a list of files that should be generated from a typical workflow 

56 """ 

57 from pesummary.conf import gw_2d_plots 

58 if not gw and not len(parameters): 

59 import string 

60 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

61 elif not len(parameters): 

62 parameters = gw_parameters() 

63 if not gw: 

64 label = "core" 

65 else: 

66 label = "gw" 

67 html = [ 

68 "%s/html/error.html" % (outdir), 

69 "%s/html/Version.html" % (outdir), 

70 "%s/html/Logging.html" % (outdir), 

71 "%s/html/About.html" % (outdir), 

72 "%s/html/Downloads.html" % (outdir)] 

73 if gw and not len(sections): 

74 sections = [ 

75 "spins", "spin_angles", "timings", "source", "remnant", "others", 

76 "masses", "location", "inclination", "energy" 

77 ] 

78 elif not len(sections): 

79 sections = ["A-D", "E-F", "I-L", "M-P", "Q-T"] 

80 for num in range(number): 

81 html.append("%s/html/%s%s_%s%s.html" % (outdir, label, num, label, num)) 

82 if gw and extra_gw_pages: 

83 html.append("%s/html/%s%s_%s%s_Classification.html" % (outdir, label, num, label, num)) 

84 html.append("%s/html/%s%s_%s%s_Corner.html" % (outdir, label, num, label, num)) 

85 html.append("%s/html/%s%s_%s%s_Config.html" % (outdir, label, num, label, num)) 

86 html.append("%s/html/%s%s_%s%s_Custom.html" % (outdir, label, num, label, num)) 

87 html.append("%s/html/%s%s_%s%s_All.html" % (outdir, label, num, label, num)) 

88 html.append("%s/html/%s%s_%s%s_Interactive_Corner.html" % ( 

89 outdir, label, num, label, num 

90 )) 

91 for section in sections: 

92 html.append("%s/html/%s%s_%s%s_%s_all.html" % (outdir, label, num, label, num, section)) 

93 for j in parameters: 

94 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, j)) 

95 if gw: 

96 # 2d histogram pages 

97 for section in ["location", "masses", "spins"]: 

98 if section == "location": 

99 pairs = [p for p in gw_2d_plots if "luminosity_distance" in p] 

100 if not any(all(p in parameters for p in _) for _ in pairs): 

101 continue 

102 elif section == "masses": 

103 pairs = [p for p in gw_2d_plots if "mass_1" in p] 

104 if not any(all(p in parameters for p in _) for _ in pairs): 

105 continue 

106 elif section == "spins": 

107 pairs = [p for p in gw_2d_plots if "chi_p" in p] 

108 if not any(all(p in parameters for p in _) for _ in pairs): 

109 continue 

110 html.append("%s/html/%s%s_%s%s_%s.html" % (outdir, label, num, label, num, section)) 

111 if existing_plot: 

112 html.append("%s/html/%s%s_%s%s_Additional.html" % (outdir, label, num, label, num)) 

113 

114 if number > 1: 

115 html.append("%s/html/Comparison.html" % (outdir)) 

116 html.append("%s/html/Comparison_Custom.html" % (outdir)) 

117 html.append("%s/html/Comparison_All.html" % (outdir)) 

118 html.append("%s/html/Comparison_Interactive_Ridgeline.html" % (outdir)) 

119 for j in parameters: 

120 if j != "classification": 

121 html.append("%s/html/Comparison_%s.html" % (outdir, j)) 

122 for section in sections: 

123 html.append("%s/html/Comparison_%s_all.html" % (outdir, section)) 

124 return sorted(html) 

125 

126 

127def get_list_of_plots( 

128 gw=False, number=1, mcmc=False, label=None, outdir=".outdir", 

129 comparison=True, psd=False, calibration=False, existing_plot=False, 

130 expert=False, parameters=[], extra_gw_plots=True 

131): 

132 """Return a list of plots that should be generated from a typical workflow 

133 """ 

134 from pesummary.conf import gw_2d_plots 

135 if not gw and not len(parameters): 

136 import string 

137 

138 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

139 elif not len(parameters): 

140 parameters = gw_parameters() 

141 if not gw and label is None: 

142 label = "core" 

143 elif label is None: 

144 label = "gw" 

145 

146 plots = [] 

147 for num in range(number): 

148 for i in ["sample_evolution", "autocorrelation", "1d_posterior", "cdf"]: 

149 for j in parameters: 

150 plots.append("%s/plots/%s%s_%s_%s.png" % (outdir, label, num, i, j)) 

151 if mcmc: 

152 for j in parameters: 

153 plots.append("%s/plots/%s%s_1d_posterior_%s_combined.png" % (outdir, label, num, j)) 

154 if gw: 

155 for pair in gw_2d_plots: 

156 if not all(p in parameters for p in pair): 

157 continue 

158 plots.append("%s/plots/%s%s_2d_posterior_%s_%s.png" % ( 

159 outdir, label, num, pair[0], pair[1] 

160 )) 

161 if psd: 

162 plots.append("%s/plots/%s%s_psd_plot.png" % (outdir, label, num)) 

163 if calibration: 

164 plots.append("%s/plots/%s%s_calibration_plot.png" % (outdir, label, num)) 

165 if existing_plot: 

166 plots.append("%s/plots/test.png" % (outdir)) 

167 if expert: 

168 for j in parameters: 

169 plots.append("%s/plots/%s%s_2d_contour_%s_log_likelihood.png" % (outdir, label, num, j)) 

170 plots.append("%s/plots/%s%s_1d_posterior_%s_bootstrap.png" % (outdir, label, num, j)) 

171 plots.append("%s/plots/%s%s_sample_evolution_%s_log_likelihood_colored.png" % (outdir, label, num, j)) 

172 if number > 1 and comparison: 

173 for i in ["1d_posterior", "boxplot", "cdf"]: 

174 for j in parameters: 

175 plots.append("%s/plots/combined_%s_%s.png" % (outdir, i, j)) 

176 if gw and extra_gw_plots: 

177 for num in range(number): 

178 plots.append("%s/plots/%s%s_skymap.png" % (outdir, label, num)) 

179 plots.append("%s/plots/%s%s_default_pepredicates.png" % (outdir, label, num)) 

180 plots.append("%s/plots/%s%s_default_pepredicates_bar.png" % (outdir, label, num)) 

181 plots.append("%s/plots/%s%s_population_pepredicates.png" % (outdir, label, num)) 

182 plots.append("%s/plots/%s%s_population_pepredicates_bar.png" % (outdir, label, num)) 

183 if number > 1 and comparison: 

184 plots.append("%s/plots/combined_skymap.png" % (outdir)) 

185 

186 return sorted(plots) 

187 

188 

189def make_argparse(gw=True, extension="json", bilby=False, lalinference=False, 

190 number=1, existing=False, disable_expert=True, outdir="./.outdir"): 

191 """ 

192 """ 

193 default_args = [] 

194 if gw: 

195 from pesummary.gw.cli.parser import ArgumentParser 

196 default_args.append("--gw") 

197 default_args.append("--nsamples_for_skymap") 

198 default_args.append("10") 

199 else: 

200 from pesummary.core.cli.parser import ArgumentParser 

201 parser = ArgumentParser() 

202 parser.add_all_known_options_to_parser() 

203 params, data = make_result_file( 

204 extension=extension, gw=gw, bilby=bilby, lalinference=lalinference, 

205 outdir=outdir 

206 ) 

207 if not existing: 

208 default_args.append("--webdir") 

209 else: 

210 default_args.append("--existing_webdir") 

211 default_args.append(outdir) 

212 default_args.append("--samples") 

213 for i in range(number): 

214 default_args.append("%s/test.%s" % (outdir, extension)) 

215 default_args.append("--labels") 

216 if not existing: 

217 for i in range(number): 

218 if not gw: 

219 default_args.append("core%s" % (i)) 

220 else: 

221 default_args.append("gw%s" % (i)) 

222 else: 

223 if not gw: 

224 default_args.append("core1") 

225 else: 

226 default_args.append("gw1") 

227 default_args.append("--config") 

228 for i in range(number): 

229 default_args.append(testing_dir + "/example_config.ini") 

230 if disable_expert: 

231 default_args.append("--disable_expert") 

232 opts = parser.parse_args(default_args) 

233 if gw: 

234 func = WebpagePlusPlottingPlusMetaFileInput 

235 else: 

236 func = Input 

237 return opts, func(opts) 

238 

239 

240def read_result_file(outdir="./.outdir", extension="json", bilby=False, 

241 lalinference=False, pesummary=False): 

242 """ 

243 """ 

244 if bilby: 

245 from bilby.core.result import read_in_result 

246 

247 if extension == "json": 

248 f = read_in_result(outdir + "/test.json") 

249 elif extension == "h5" or extension == "hdf5": 

250 f = read_in_result(outdir + "/test.h5") 

251 posterior = f.posterior 

252 posterior = posterior.select_dtypes(include=[float, int]) 

253 samples = {key: val for key, val in posterior.items()} 

254 elif lalinference: 

255 import h5py 

256 

257 f = h5py.File(outdir + "/test.hdf5", "r") 

258 

259 posterior = f["lalinference"]["lalinference_nest"]["posterior_samples"] 

260 samples = { 

261 i: [j[num] for j in posterior] for num, i in enumerate( 

262 posterior.dtype.names 

263 ) 

264 } 

265 elif pesummary: 

266 from pesummary.gw.file.read import read 

267 

268 if extension == "json": 

269 f = read(outdir + "/test.json") 

270 else: 

271 f = read(outdir + "/test.h5") 

272 data = f.samples_dict 

273 labels = f.labels 

274 samples = data[labels[0]] 

275 elif extension == "dat": 

276 import numpy as np 

277 

278 data = np.genfromtxt(outdir + "/test.dat", names=True) 

279 samples = { 

280 i: [j[num] for j in data] for num, i in enumerate(data.dtype.names) 

281 } 

282 else: 

283 samples = {} 

284 return samples 

285 

286 

287def make_psd(outdir="./.outdir"): 

288 """Make a psd file 

289 """ 

290 frequencies = np.linspace(0, 1024, 1000) 

291 strains = np.random.uniform(10, 0.1, 1000) 

292 data = np.vstack([frequencies, strains]).T 

293 np.savetxt( 

294 "{}/psd.dat".format(outdir), data, delimiter="\t", 

295 header="\t".join(["frequencies", "strain"]) 

296 ) 

297 

298 

299def make_calibration(outdir="./.outdir"): 

300 """Make a calibration file 

301 """ 

302 frequencies = np.linspace(0, 1024, 1000) 

303 columns = [np.random.uniform(10, 0.1, 1000) for _ in range(6)] 

304 data = np.vstack([frequencies] + columns).T 

305 np.savetxt( 

306 "{}/calibration.dat".format(outdir), data, delimiter="\t", 

307 header="\t".join(["frequencies", "a", "b", "c", "d", "e", "f"]) 

308 ) 

309 

310 

311def make_injection_file( 

312 outdir="./.outdir", extension="json", return_filename=True, 

313 return_injection_dict=True 

314): 

315 """ 

316 """ 

317 from pesummary.io import write 

318 

319 filename = os.path.join(outdir, "injection.{}".format(extension)) 

320 parameters = gw_parameters() 

321 samples = np.array([[np.random.random()] for i in range(len(parameters))]).T 

322 write(parameters, samples, filename=filename, file_format=extension) 

323 args = [] 

324 if return_filename: 

325 args.append(filename) 

326 if return_injection_dict: 

327 args.append({param: samples[0][num] for num, param in enumerate(parameters)}) 

328 return args 

329 

330 

331def make_result_file(outdir="./.outdir/", extension="json", gw=True, bilby=False, 

332 lalinference=False, pesummary=False, pesummary_label="label", 

333 config=None, psd=None, calibration=None, random_seed=None, 

334 n_samples=1000): 

335 """Make a result file that can be read in by PESummary 

336 

337 Parameters 

338 ---------- 

339 outdir: str 

340 directory where you would like to store the result file 

341 extension: str 

342 the file extension of the result file 

343 gw: Bool 

344 if True, gw parameters will be used 

345 """ 

346 if random_seed is not None: 

347 np.random.seed(random_seed) 

348 print(extension, gw, bilby, lalinference, pesummary) 

349 if outdir[-1] != "/": 

350 outdir += "/" 

351 data = np.array([np.random.random(18) for i in range(n_samples)]) 

352 if gw: 

353 parameters = ["mass_1", "mass_2", "a_1", "a_2", "tilt_1", "tilt_2", 

354 "phi_jl", "phi_12", "psi", "theta_jn", "ra", "dec", 

355 "luminosity_distance", "geocent_time", "redshift", 

356 "mass_1_source", "mass_2_source", "log_likelihood"] 

357 distance = np.random.random(n_samples) * 500 

358 mass_1 = np.random.random(n_samples) * 100 

359 q = np.random.random(n_samples) * 100 

360 a_1 = np.random.uniform(0, 0.99, n_samples) 

361 a_2 = np.random.uniform(0, 0.99, n_samples) 

362 for num, i in enumerate(data): 

363 data[num][12] = distance[num] 

364 data[num][0] = mass_1[num] 

365 data[num][1] = mass_1[num] * q[num] 

366 data[num][2] = a_1[num] 

367 data[num][3] = a_2[num] 

368 else: 

369 import string 

370 

371 parameters = list(string.ascii_lowercase)[:17] + ["log_likelihood"] 

372 if extension == "dat": 

373 np.savetxt(outdir + "test.dat", data, delimiter=" ", 

374 header=" ".join(parameters), comments="") 

375 elif extension == "csv": 

376 np.savetxt(outdir + "test.csv", data, delimiter=",", 

377 header=",".join(parameters), comments="") 

378 elif extension == "npy": 

379 from pesummary.utils.samples_dict import SamplesDict 

380 samples = SamplesDict(parameters, np.array(data).T).to_structured_array() 

381 np.save(outdir + "test.npy", samples) 

382 elif extension == "json" and not bilby and not pesummary and not lalinference: 

383 import json 

384 

385 dictionary = {"NameOfCode": {"posterior_samples": {key: 

386 [i[num] for i in data] for num, key in 

387 enumerate(parameters)}}} 

388 with open(outdir + "test.json", "w") as f: 

389 json.dump(dictionary, f, indent=4, sort_keys=True) 

390 elif (extension == "hdf5" or extension == "h5") and not bilby and not pesummary and not lalinference: 

391 import h5py 

392 

393 h5py_data = np.array( 

394 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters]) 

395 f = h5py.File(outdir + "test.h5", "w") 

396 name = f.create_group("NameOfCode") 

397 samples = f.create_dataset("posterior_samples", data=h5py_data) 

398 f.close() 

399 elif bilby and not pesummary and not lalinference: 

400 import bilby 

401 from bilby.core.result import Result 

402 from bilby.core.prior import PriorDict 

403 from pandas import DataFrame 

404 

405 priors = PriorDict() 

406 priors.update({"%s" % (i): bilby.core.prior.Uniform(0.1, 0.5, 0) for i in parameters}) 

407 posterior_data_frame = DataFrame(data, columns=parameters) 

408 injection_parameters = {par: 1. for par in parameters} 

409 bilby_object = Result( 

410 search_parameter_keys=parameters, samples=data, 

411 posterior=posterior_data_frame, label="test", 

412 injection_parameters=injection_parameters, 

413 priors=priors, 

414 log_bayes_factor=0.5, log_evidence_err=0.1, log_noise_evidence=0.1, 

415 log_evidence=0.2, version=["bilby=0.5.3:"], 

416 meta_data={"likelihood": {"time_marginalization": "True"}}) 

417 if extension == "json": 

418 bilby_object.save_to_file( 

419 filename=outdir + "test.json", extension="json") 

420 elif extension == "hdf5" or extension == "h5": 

421 bilby_object.save_to_file( 

422 filename=outdir + "test.h5", extension="hdf5") 

423 elif lalinference and not bilby and not pesummary: 

424 import h5py 

425 

426 h5py_data = np.array( 

427 [tuple(i) for i in data], dtype=[tuple([i, 'float64']) for i in parameters]) 

428 f = h5py.File(outdir + "test.hdf5", "w") 

429 lalinference = f.create_group("lalinference") 

430 nest = lalinference.create_group("lalinference_nest") 

431 samples = nest.create_dataset("posterior_samples", data=h5py_data) 

432 f.close() 

433 elif pesummary and not lalinference and not bilby: 

434 dictionary = { 

435 pesummary_label: { 

436 "posterior_samples": { 

437 "parameter_names": parameters, 

438 "samples": [list(i) for i in data] 

439 }, 

440 "injection_data": { 

441 "injection_values": [float("nan") for i in range(len(parameters))] 

442 }, 

443 "version": ["No version information found"], 

444 "meta_data": { 

445 "sampler": {"log_evidence": 0.5}, 

446 "meta_data": {} 

447 } 

448 }, 

449 "version": { 

450 "pesummary": ["v0.1.7"] 

451 } 

452 } 

453 if config is not None: 

454 dictionary[pesummary_label]["config_file"] = config 

455 if psd is not None: 

456 dictionary[pesummary_label]["psds"] = psds 

457 if calibration is not None: 

458 dictionary[pesummary_label]["calibration_envelope"] = calibration 

459 

460 if extension == "json": 

461 import json 

462 

463 with open(outdir + "test.json", "w") as f: 

464 json.dump(dictionary, f, indent=4, sort_keys=True) 

465 elif extension == "hdf5" or extension == "h5": 

466 import h5py 

467 from pesummary.core.file.meta_file import recursively_save_dictionary_to_hdf5_file 

468 

469 f = h5py.File(outdir + "test.h5", "w") 

470 recursively_save_dictionary_to_hdf5_file( 

471 f, dictionary, extra_keys=list(dictionary.keys()) 

472 ) 

473 f.close() 

474 return parameters, data 

475 

476 

477testing_dir = str(Path(__file__).parent) 

478data_dir = str(Path(__file__).parent / "files")