Coverage for pesummary/gw/file/calibration.py: 62.0%

79 statements  

« prev     ^ index     » next       coverage.py v7.4.4, created at 2024-12-09 22:34 +0000

1# Licensed under an MIT style license -- see LICENSE.md 

2 

3import numpy as np 

4from scipy.interpolate import interp1d 

5from pesummary import conf 

6from pesummary.utils.utils import check_file_exists_and_rename, logger 

7from pesummary.utils.dict import Dict 

8 

9__author__ = ["Charlie Hoy <charlie.hoy@ligo.org>"] 

10 

11 

12def _apply_calibration_correction(array=None, amplitude=None, phase=None, type="data"): 

13 """Apply the calibration correction based on the calibration type. If 

14 type='data', the amplitude is inverted and phase multiplied by -1. If 

15 type='template', the amplitude and phase is left unchanged. 

16 

17 Parameters 

18 ---------- 

19 array: np.ndarray, optional 

20 an opened calibration envelope file (with e.g. np.genfromtxt). Columns must 

21 be "Frequency", "Median Mag", "Phase (Rad)", "-1 Sigma Mag", "-1 Sigma Phase", 

22 "+1 Sigma Mag", "+1 Sigma Phase" 

23 amplitude: np.ndarray, optional 

24 array of only calibration amplitudes 

25 phase: np.ndarray, optional 

26 array of only calibration phases 

27 type: str, optional 

28 type of calibration data. Must be either 'data' or 'template' 

29 """ 

30 if type not in ["data", "template"]: 

31 raise ValueError(f"Unknown calibration type: {type}") 

32 if all(_ is None for _ in [array, amplitude, phase]): 

33 raise ValueError( 

34 "Please provide either an opened calibration file, or amplitude and phase " 

35 "corrections" 

36 ) 

37 elif array is None and any(_ is None for _ in [amplitude, phase]): 

38 raise ValueError( 

39 "Please provide data for both the amplitude and phase corrections" 

40 ) 

41 elif array is not None and any(_ is not None for _ in [amplitude, phase]): 

42 logger.warning( 

43 "An opened calibration file and amplitude/phase corrections are provided. " 

44 "Using opened calibration file" 

45 ) 

46 if array is not None: 

47 amplitude = np.array([array.T[1], array.T[3], array.T[5]]) 

48 phase = np.array([array.T[2], array.T[4], array.T[6]]) 

49 if type == "data": 

50 if amplitude is not None: 

51 amplitude = 1. / amplitude 

52 if phase is not None: 

53 phase *= -1 

54 return amplitude, phase 

55 

56 

57def _spline_angle_xform(delta_psi): 

58 """Returns the angle in degrees corresponding to the spline 

59 calibration parameters delta_psi. Code taken from lalinference.bayespputils 

60 

61 Parameters 

62 ---------- 

63 delta_psi: array 

64 calibration phase uncertainty 

65 """ 

66 rotation = (2.0 + 1.0j * delta_psi) / (2.0 - 1.0j * delta_psi) 

67 return 180.0 / np.pi * np.arctan2(np.imag(rotation), np.real(rotation)) 

68 

69 

70def _interpolate_spline_model( 

71 frequencies, data, interpolated_frequencies, nfreqs=100, xform=None, 

72 level=0.9, pbar=None 

73): 

74 """Interpolate calibration posterior estimates for a spline model in log 

75 space. Code based upon same function in lalinference.bayespputils 

76 

77 Parameters 

78 ---------- 

79 frequencies: array 

80 The spline control points 

81 data: ndarray 

82 Array of posterior samples at each spline control point 

83 interpolated_frequencies: array 

84 Array of frequencies you wish to evaluate the interpolant for 

85 nfreqs: int, optional 

86 Number of points to evaluate the interpolates spline. Default 100 

87 xform: func, optional 

88 Function to transform the spline 

89 """ 

90 interpolated_data = np.zeros((np.asarray(data).shape[0], nfreqs)) 

91 for num, samp in enumerate(data): 

92 interp = interp1d( 

93 frequencies, samp, kind="cubic", fill_value=0., bounds_error=False 

94 )(interpolated_frequencies) 

95 if xform is not None: 

96 interp = xform(interp) 

97 interpolated_data[num] = interp 

98 if pbar is not None: 

99 pbar.update(1) 

100 

101 mean = np.mean(interpolated_data, axis=0) 

102 lower = np.quantile(interpolated_data, (1 - level) / 2., axis=0) 

103 upper = np.quantile(interpolated_data, (1 + level) / 2., axis=0) 

104 return mean, lower, upper 

105 

106 

107def interpolate_calibration_posterior_from_samples( 

108 log_frequencies, amplitudes, phases, nfreqs=100, level=0.9, **kwargs 

109): 

110 """Interpolate calibration posterior estimates for a spline model in log 

111 space and return the amplitude and phase uncertainties. Code based upon same 

112 function in lalinference.bayespputils 

113 

114 Parameters 

115 ---------- 

116 log_frequencies: array 

117 The spline control points. 

118 amplitudes: ndarray 

119 Array of amplitude posterior samples at each of the spline control 

120 points 

121 phases: ndarray 

122 Array of phase posterior samples at each of the spline control points 

123 nfreqs: int, optional 

124 Number of points to evaluate the interpolates spline. Default 100 

125 **kwargs: dict 

126 All kwargs passed to _interpolate_spline_model 

127 """ 

128 frequencies = np.exp(log_frequencies) 

129 interpolated_frequencies = np.logspace( 

130 np.min(log_frequencies), np.max(log_frequencies), nfreqs, base=np.e 

131 ) 

132 amp_mean, amp_lower, amp_upper = ( 

133 1 + np.array( 

134 _interpolate_spline_model( 

135 log_frequencies, np.column_stack(amplitudes), 

136 np.log(interpolated_frequencies), nfreqs=nfreqs, xform=None, 

137 level=level, **kwargs 

138 ) 

139 ) 

140 ) 

141 phase_mean, phase_lower, phase_upper = np.array( 

142 _interpolate_spline_model( 

143 log_frequencies, np.column_stack(phases), 

144 np.log(interpolated_frequencies), nfreqs=nfreqs, 

145 xform=_spline_angle_xform, level=level, **kwargs 

146 ) 

147 ) * (np.pi / 180) 

148 return np.column_stack( 

149 [ 

150 interpolated_frequencies, amp_mean, phase_mean, amp_lower, 

151 phase_lower, amp_upper, phase_upper 

152 ] 

153 ) 

154 

155 

156class CalibrationDict(Dict): 

157 """Class to handle a dictionary of calibration data 

158 

159 Parameters 

160 ---------- 

161 detectors: list 

162 list of detectors 

163 data: nd list 

164 list of calibration samples for each detector. Each of the columns 

165 should represent Frequency, Median Mag, Phase (Rad), -1 Sigma Mag, 

166 -1 Sigma Phase, +1 Sigma Mag, +1 Sigma Phase 

167 

168 Attributes 

169 ---------- 

170 detectors: list 

171 list of detectors stored in the dictionary 

172 

173 Methods 

174 ------- 

175 plot: 

176 Generate a plot based on the calibration samples stored 

177 """ 

178 def __init__(self, *args): 

179 _columns = [ 

180 "frequencies", "magnitude", "phase", "magnitude_lower", 

181 "phase_lower", "magnitude_upper", "phase_upper" 

182 ] 

183 super(CalibrationDict, self).__init__( 

184 *args, value_class=Calibration, value_columns=_columns 

185 ) 

186 

187 @property 

188 def detectors(self): 

189 return list(self.keys()) 

190 

191 

192class Calibration(np.ndarray): 

193 """Class to handle Calibration data 

194 """ 

195 def __new__(cls, input_array): 

196 obj = np.asarray(input_array).view(cls) 

197 if obj.shape[1] != 7: 

198 raise ValueError( 

199 "Invalid input data. See the docs for instructions" 

200 ) 

201 return obj 

202 

203 @classmethod 

204 def read(cls, path_to_file, IFO=None, type="data", **kwargs): 

205 """Read in a file and initialize the Calibration class 

206 

207 Parameters 

208 ---------- 

209 path_to_file: str 

210 the path to the file you wish to load 

211 IFO: str, optional 

212 name of the IFO which relates to the input file 

213 type: str, optional 

214 the calibration definition. This can either be 'data' or 'template' 

215 and determines whether the calibration is applied to the data 

216 or the template. Default: 'data' 

217 **kwargs: dict 

218 all kwargs are passed to the np.genfromtxt method 

219 """ 

220 try: 

221 f = np.genfromtxt(path_to_file, **kwargs) 

222 amplitudes, phases = _apply_calibration_correction(array=f, type=type) 

223 data = np.array([ 

224 f.T[0], amplitudes[0], phases[0], amplitudes[1], phases[1], 

225 amplitudes[2], phases[2] 

226 ]).T 

227 return cls(data) 

228 except Exception: 

229 raise 

230 

231 @classmethod 

232 def from_spline_posterior_samples( 

233 cls, log_frequencies, amplitudes, phases, **kwargs 

234 ): 

235 """Interpolate calibration posterior estimates for a spline model in log 

236 space and initialize the Calibration class 

237 

238 Parameters 

239 ---------- 

240 """ 

241 samples = interpolate_calibration_posterior_from_samples( 

242 log_frequencies, amplitudes, phases, level=0.68, nfreqs=300, 

243 **kwargs 

244 ) 

245 return cls(samples) 

246 

247 def save_to_file(self, file_name, comments="#", delimiter=conf.delimiter): 

248 """Save the calibration data to file 

249 

250 Parameters 

251 ---------- 

252 file_name: str 

253 name of the file name that you wish to use 

254 comments: str, optional 

255 String that will be prepended to the header and footer strings, to 

256 mark them as comments. Default is '#'. 

257 delimiter: str, optional 

258 String or character separating columns. 

259 """ 

260 check_file_exists_and_rename(file_name) 

261 header = [ 

262 "Frequency", "Median Mag", "Phase (Rad)", "-1 Sigma Mag", 

263 "-1 Sigma Phase", "+1 Sigma Mag", "+1 Sigma Phase" 

264 ] 

265 np.savetxt( 

266 file_name, self, delimiter=delimiter, comments=comments, 

267 header=delimiter.join(header) 

268 ) 

269 

270 def __array_finalize__(self, obj): 

271 if obj is None: 

272 return