Coverage for pesummary/gw/file/calibration.py: 63.3%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2025-11-05 13:38 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3import numpy as np 

4from scipy.interpolate import interp1d 

5from pesummary import conf 

6from pesummary.utils.utils import check_file_exists_and_rename, logger 

7from pesummary.utils.dict import Dict 

8 

9__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

10 

11 

12def _apply_calibration_correction(array=None, amplitude=None, phase=None, type="data"): 

13 """Apply the calibration correction based on the calibration type. If 

14 type='data', the amplitude is inverted and phase multiplied by -1. If 

15 type='template', the amplitude and phase is left unchanged. 

16 

17 Parameters 

18 ---------- 

19 array: np.ndarray, optional 

20 an opened calibration envelope file (with e.g. np.genfromtxt). Columns must 

21 be "Frequency", "Median Mag", "Phase (Rad)", "-1 Sigma Mag", "-1 Sigma Phase", 

22 "+1 Sigma Mag", "+1 Sigma Phase" 

23 amplitude: np.ndarray, optional 

24 array of only calibration amplitudes 

25 phase: np.ndarray, optional 

26 array of only calibration phases 

27 type: str, optional 

28 type of calibration data. Must be either 'data' or 'template' 

29 """ 

30 if type not in ["data", "template"]: 

31 raise ValueError(f"Unknown calibration type: {type}") 

32 if all(_ is None for _ in [array, amplitude, phase]): 

33 raise ValueError( 

34 "Please provide either an opened calibration file, or amplitude and phase " 

35 "corrections" 

36 ) 

37 elif array is None and any(_ is None for _ in [amplitude, phase]): 

38 raise ValueError( 

39 "Please provide data for both the amplitude and phase corrections" 

40 ) 

41 elif array is not None and any(_ is not None for _ in [amplitude, phase]): 

42 logger.warning( 

43 "An opened calibration file and amplitude/phase corrections are provided. " 

44 "Using opened calibration file" 

45 ) 

46 if array is not None: 

47 amplitude = np.array([array.T[1], array.T[3], array.T[5]]) 

48 phase = np.array([array.T[2], array.T[4], array.T[6]]) 

49 if type == "data": 

50 if amplitude is not None: 

51 amplitude = 1. / amplitude 

52 if phase is not None: 

53 phase *= -1 

54 return amplitude, phase 

55 

56 

57def _spline_angle_xform(delta_psi): 

58 """Returns the angle in degrees corresponding to the spline 

59 calibration parameters delta_psi. Code taken from lalinference.bayespputils 

60 

61 Parameters 

62 ---------- 

63 delta_psi: array 

64 calibration phase uncertainty 

65 """ 

66 rotation = (2.0 + 1.0j * delta_psi) / (2.0 - 1.0j * delta_psi) 

67 return 180.0 / np.pi * np.arctan2(np.imag(rotation), np.real(rotation)) 

68 

69 

70def _interpolate_spline_model( 

71 frequencies, data, interpolated_frequencies, nfreqs=100, xform=None, 

72 level=0.9, pbar=None 

73): 

74 """Interpolate calibration posterior estimates for a spline model in log 

75 space. Code based upon same function in lalinference.bayespputils 

76 

77 Parameters 

78 ---------- 

79 frequencies: array 

80 The spline control points 

81 data: ndarray 

82 Array of posterior samples at each spline control point 

83 interpolated_frequencies: array 

84 Array of frequencies you wish to evaluate the interpolant for 

85 nfreqs: int, optional 

86 Number of points to evaluate the interpolates spline. Default 100 

87 xform: func, optional 

88 Function to transform the spline 

89 """ 

90 interpolated_data = np.zeros((np.asarray(data).shape[0], nfreqs)) 

91 for num, samp in enumerate(data): 

92 interp = interp1d( 

93 frequencies, samp, kind="cubic", fill_value=0., bounds_error=False 

94 )(interpolated_frequencies) 

95 if xform is not None: 

96 interp = xform(interp) 

97 interpolated_data[num] = interp 

98 if pbar is not None: 

99 pbar.update(1) 

100 

101 mean = np.mean(interpolated_data, axis=0) 

102 lower = np.quantile(interpolated_data, (1 - level) / 2., axis=0) 

103 upper = np.quantile(interpolated_data, (1 + level) / 2., axis=0) 

104 return mean, lower, upper 

105 

106 

107def interpolate_calibration_posterior_from_samples( 

108 log_frequencies, amplitudes, phases, nfreqs=100, level=0.9, **kwargs 

109): 

110 """Interpolate calibration posterior estimates for a spline model in log 

111 space and return the amplitude and phase uncertainties. Code based upon same 

112 function in lalinference.bayespputils 

113 

114 Parameters 

115 ---------- 

116 log_frequencies: array 

117 The spline control points. 

118 amplitudes: ndarray 

119 Array of amplitude posterior samples at each of the spline control 

120 points 

121 phases: ndarray 

122 Array of phase posterior samples at each of the spline control points 

123 nfreqs: int, optional 

124 Number of points to evaluate the interpolates spline. Default 100 

125 **kwargs: dict 

126 All kwargs passed to _interpolate_spline_model 

127 """ 

128 frequencies = np.exp(log_frequencies) 

129 interpolated_frequencies = np.logspace( 

130 np.min(log_frequencies), np.max(log_frequencies), nfreqs, base=np.e 

131 ) 

132 amp_mean, amp_lower, amp_upper = ( 

133 1 + np.array( 

134 _interpolate_spline_model( 

135 log_frequencies, np.column_stack(amplitudes), 

136 np.log(interpolated_frequencies), nfreqs=nfreqs, xform=None, 

137 level=level, **kwargs 

138 ) 

139 ) 

140 ) 

141 phase_mean, phase_lower, phase_upper = np.array( 

142 _interpolate_spline_model( 

143 log_frequencies, np.column_stack(phases), 

144 np.log(interpolated_frequencies), nfreqs=nfreqs, 

145 xform=_spline_angle_xform, level=level, **kwargs 

146 ) 

147 ) * (np.pi / 180) 

148 return np.column_stack( 

149 [ 

150 interpolated_frequencies, amp_mean, phase_mean, amp_lower, 

151 phase_lower, amp_upper, phase_upper 

152 ] 

153 ) 

154 

155 

156class CalibrationDict(Dict): 

157 """Class to handle a dictionary of calibration data 

158 

159 Parameters 

160 ---------- 

161 detectors: list 

162 list of detectors 

163 data: nd list 

164 list of calibration samples for each detector. Each of the columns 

165 should represent Frequency, Median Mag, Phase (Rad), -1 Sigma Mag, 

166 -1 Sigma Phase, +1 Sigma Mag, +1 Sigma Phase 

167 

168 Attributes 

169 ---------- 

170 detectors: list 

171 list of detectors stored in the dictionary 

172 

173 Methods 

174 ------- 

175 plot: 

176 Generate a plot based on the calibration samples stored 

177 """ 

178 def __init__(self, *args): 

179 _columns = [ 

180 "frequencies", "magnitude", "phase", "magnitude_lower", 

181 "phase_lower", "magnitude_upper", "phase_upper" 

182 ] 

183 super(CalibrationDict, self).__init__( 

184 *args, value_class=Calibration, value_columns=_columns, 

185 deconstruct_complex_columns=False 

186 ) 

187 

188 @property 

189 def detectors(self): 

190 return list(self.keys()) 

191 

192 

193class Calibration(np.ndarray): 

194 """Class to handle Calibration data 

195 """ 

196 def __new__(cls, input_array): 

197 obj = np.asarray(input_array).view(cls) 

198 if obj.shape[1] != 7: 

199 raise ValueError( 

200 "Invalid input data. See the docs for instructions" 

201 ) 

202 return obj 

203 

204 @classmethod 

205 def read(cls, path_to_file, IFO=None, type="data", **kwargs): 

206 """Read in a file and initialize the Calibration class 

207 

208 Parameters 

209 ---------- 

210 path_to_file: str 

211 the path to the file you wish to load 

212 IFO: str, optional 

213 name of the IFO which relates to the input file 

214 type: str, optional 

215 the calibration definition. This can either be 'data' or 'template' 

216 and determines whether the calibration is applied to the data 

217 or the template. Default: 'data' 

218 **kwargs: dict 

219 all kwargs are passed to the np.genfromtxt method 

220 """ 

221 try: 

222 f = np.genfromtxt(path_to_file, **kwargs) 

223 amplitudes, phases = _apply_calibration_correction(array=f, type=type) 

224 data = np.array([ 

225 f.T[0], amplitudes[0], phases[0], amplitudes[1], phases[1], 

226 amplitudes[2], phases[2] 

227 ]).T 

228 return cls(data) 

229 except Exception: 

230 raise 

231 

232 @classmethod 

233 def from_spline_posterior_samples( 

234 cls, log_frequencies, amplitudes, phases, **kwargs 

235 ): 

236 """Interpolate calibration posterior estimates for a spline model in log 

237 space and initialize the Calibration class 

238 

239 Parameters 

240 ---------- 

241 """ 

242 samples = interpolate_calibration_posterior_from_samples( 

243 log_frequencies, amplitudes, phases, level=0.68, nfreqs=300, 

244 **kwargs 

245 ) 

246 return cls(samples) 

247 

248 def save_to_file(self, file_name, comments="#", delimiter=conf.delimiter): 

249 """Save the calibration data to file 

250 

251 Parameters 

252 ---------- 

253 file_name: str 

254 name of the file name that you wish to use 

255 comments: str, optional 

256 String that will be prepended to the header and footer strings, to 

257 mark them as comments. Default is '#'. 

258 delimiter: str, optional 

259 String or character separating columns. 

260 """ 

261 check_file_exists_and_rename(file_name) 

262 header = [ 

263 "Frequency", "Median Mag", "Phase (Rad)", "-1 Sigma Mag", 

264 "-1 Sigma Phase", "+1 Sigma Mag", "+1 Sigma Phase" 

265 ] 

266 np.savetxt( 

267 file_name, self, delimiter=delimiter, comments=comments, 

268 header=delimiter.join(header) 

269 ) 

270 

271 def __array_finalize__(self, obj): 

272 if obj is None: 

273 return