Coverage for pesummary/core/file/read.py: 92.2%

116 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-05-02 08:42 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3from pesummary.core.file.formats.base_read import Read 

4from pesummary.core.file.formats.bilby import Bilby, bilby_version 

5from pesummary.core.file.formats.default import Default 

6from pesummary.core.file.formats.pesummary import PESummary, PESummaryDeprecated 

7from pesummary.utils.utils import logger 

8import os 

9 

10__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

11 

12 

13def is_bilby_hdf5_file(path): 

14 """Determine if the results file is a bilby hdf5 results file 

15 

16 Parameters 

17 ---------- 

18 path: str 

19 path to the results file 

20 """ 

21 import h5py 

22 f = h5py.File(path, "r") 

23 cond = _check_bilby_version(f) 

24 f.close() 

25 return cond 

26 

27 

28def is_bilby_json_file(path): 

29 """Determine if the results file is a bilby json results file 

30 

31 Parameters 

32 ---------- 

33 path: str 

34 path to the results file 

35 """ 

36 import json 

37 with open(path, "r") as f: 

38 data = json.load(f) 

39 return _check_bilby_version(data) 

40 

41 

42def _check_bilby_version(data): 

43 """Identify if 'bilby' is in the version information extracted from the 

44 file 

45 

46 Parameters 

47 ---------- 

48 data: iterable 

49 contents of the result file, opened with either h5py or json libraries 

50 """ 

51 try: 

52 version = bilby_version(data=data) 

53 if isinstance(version, bytes): 

54 version = version.decode("utf-8") 

55 if "bilby" in version: 

56 return True 

57 return False 

58 except Exception: 

59 return False 

60 

61 

62def _is_pesummary_hdf5_file(path, check_function): 

63 """Determine if the results file is a pesummary hdf5 file 

64 

65 Parameters 

66 ---------- 

67 path: str 

68 path to the results file 

69 check_function: func 

70 function used to check the result file 

71 """ 

72 import h5py 

73 f = h5py.File(path, 'r') 

74 outcome = check_function(f) 

75 f.close() 

76 return outcome 

77 

78 

79def is_pesummary_hdf5_file_deprecated(path): 

80 """Determine if the results file is a deprecated pesummary hdf5 file 

81 

82 Parameters 

83 ---------- 

84 path: str 

85 path to the results file 

86 """ 

87 return _is_pesummary_hdf5_file(path, _check_pesummary_file_deprecated) 

88 

89 

90def is_pesummary_hdf5_file(path): 

91 """Determine if the results file is a pesummary hdf5 file 

92 

93 Parameters 

94 ---------- 

95 path: str 

96 path to the results file 

97 """ 

98 return _is_pesummary_hdf5_file(path, _check_pesummary_file) 

99 

100 

101def _is_pesummary_json_file(path, check_function): 

102 """Determine if the result file is a pesummary json file 

103 

104 Parameters 

105 ---------- 

106 path: str 

107 path to the results file 

108 check_function: func 

109 function used to check the result file 

110 """ 

111 import json 

112 with open(path, "r") as f: 

113 data = json.load(f) 

114 return check_function(data) 

115 

116 

117def is_pesummary_json_file(path): 

118 """Determine if the results file is a pesummary json file 

119 

120 Parameters 

121 ---------- 

122 path: str 

123 path to results file 

124 """ 

125 return _is_pesummary_json_file(path, _check_pesummary_file) 

126 

127 

128def is_pesummary_json_file_deprecated(path): 

129 """Determine if the results file is a deprecated pesummary json file 

130 

131 Parameters 

132 ---------- 

133 path: str 

134 path to results file 

135 """ 

136 return _is_pesummary_json_file(path, _check_pesummary_file_deprecated) 

137 

138 

139def _check_pesummary_file_deprecated(f): 

140 """Check the contents of a dictionary to see if it is a deprecated pesummary 

141 dictionary 

142 

143 Parameters 

144 ---------- 

145 f: dict 

146 dictionary of the contents of the file 

147 """ 

148 if "posterior_samples" in f.keys(): 

149 try: 

150 import collections 

151 

152 labels = f["posterior_samples"].keys() 

153 if isinstance(labels, collections.abc.KeysView): 

154 _label = list(labels)[0] 

155 try: 

156 _param = list(f["posterior_samples"][_label].keys())[0] 

157 samples = f["posterior_samples"][_label][_param] 

158 return True 

159 except Exception: 

160 return False 

161 else: 

162 return False 

163 except Exception: 

164 return False 

165 else: 

166 return False 

167 

168 

169def _check_pesummary_file(f): 

170 """Check the contents of a dictionary to see if it is a pesummary dictionary 

171 

172 Parameters 

173 ---------- 

174 f: dict 

175 dictionary of the contents of the file 

176 """ 

177 labels = f.keys() 

178 if "version" not in labels: 

179 return False 

180 try: 

181 if all( 

182 "posterior_samples" in f[label].keys() for label in labels if 

183 label != "version" and label != "history" and label != "strain" 

184 ): 

185 return True 

186 elif all( 

187 "mcmc_chains" in f[label].keys() for label in labels if 

188 label != "version" and label != "history" and label != "strain" 

189 ): 

190 return True 

191 else: 

192 return False 

193 except Exception: 

194 return False 

195 

196 

197CORE_HDF5_LOAD = { 

198 is_bilby_hdf5_file: Bilby.load_file, 

199 is_pesummary_hdf5_file: PESummary.load_file, 

200 is_pesummary_hdf5_file_deprecated: PESummaryDeprecated.load_file 

201} 

202 

203CORE_JSON_LOAD = { 

204 is_bilby_json_file: Bilby.load_file, 

205 is_pesummary_json_file: PESummary.load_file, 

206 is_pesummary_json_file_deprecated: PESummaryDeprecated.load_file 

207} 

208 

209CORE_DEFAULT_LOAD = { 

210 "default": Default.load_file 

211} 

212 

213DEFAULT_FORMATS = ["default", "dat", "json", "hdf5", "h5", "txt"] 

214 

215 

216def _read(path, load_options, default=CORE_DEFAULT_LOAD, **load_kwargs): 

217 """Try and load a result file according to multiple options 

218 

219 Parameters 

220 ---------- 

221 path: str 

222 path to results file 

223 load_options: dict 

224 dictionary of checks and loading functions 

225 """ 

226 for check, load in load_options.items(): 

227 if check(path): 

228 try: 

229 return load(path, **load_kwargs) 

230 except ImportError as e: 

231 logger.warning( 

232 "Failed due to import error: {}. Using default load".format( 

233 e 

234 ) 

235 ) 

236 return default["default"](path, **load_kwargs) 

237 except Exception as e: 

238 logger.info( 

239 "Failed to read in {} with the {} class because {}".format( 

240 path, load, e 

241 ) 

242 ) 

243 continue 

244 if len(load_options): 

245 logger.warning( 

246 "Using the default load because {} failed the following checks: {}".format( 

247 path, ", ".join([function.__name__ for function in load_options.keys()]) 

248 ) 

249 ) 

250 return default["default"](path, **load_kwargs) 

251 

252 

253def _file_format(file_format, options): 

254 """Return a dictionary of load options. If a file format is specified, then return 

255 a dictionary containing only that one load option 

256 

257 Parameters 

258 ---------- 

259 file_format: str 

260 the file format you wish to use. If None, all possible load options will be 

261 returned 

262 options: dict 

263 dictionary of possible load options. Key is a function which returns True 

264 or False depending on whether the input file belongs to that class of objects, 

265 value is the load function 

266 """ 

267 if file_format is None: 

268 return options 

269 elif any(file_format.lower() in key for key in DEFAULT_FORMATS): 

270 reduced = {} 

271 elif not any(file_format in key.__name__ for key in options.keys()): 

272 raise ValueError( 

273 "Unrecognised file format. The format must be a substring of any" 

274 "of the following: {}".format( 

275 ", ".join([key.__name__ for key in options.keys()] + DEFAULT_FORMATS) 

276 ) 

277 ) 

278 else: 

279 reduced = { 

280 key: value for key, value in options.items() if file_format in key.__name__ 

281 } 

282 return reduced 

283 

284 

285def read( 

286 path, HDF5_LOAD=CORE_HDF5_LOAD, JSON_LOAD=CORE_JSON_LOAD, 

287 DEFAULT=CORE_DEFAULT_LOAD, file_format=None, **kwargs 

288): 

289 """Read in a results file. 

290 

291 Parameters 

292 ---------- 

293 path: str 

294 path to results file 

295 format: str 

296 the file format you wish to use. Default None. If None, the read 

297 function loops through all possible options 

298 **kwargs: dict, optional 

299 all additional kwargs are passed directly to the load_file class method 

300 """ 

301 path = os.path.expanduser(path) 

302 extension = Read.extension_from_path(path) 

303 

304 if extension in ["gz"]: 

305 from pesummary.utils.utils import unzip 

306 path = unzip(path) 

307 if extension in ["hdf5", "h5", "hdf"]: 

308 options = _file_format(file_format, HDF5_LOAD) 

309 return _read(path, options, default=DEFAULT, **kwargs) 

310 elif extension == "json": 

311 options = _file_format(file_format, JSON_LOAD) 

312 return _read(path, options, default=DEFAULT, **kwargs) 

313 else: 

314 return DEFAULT["default"](path, file_format=file_format, **kwargs)