import itertools
import warnings
from astropy import units
import matplotlib
from matplotlib import pyplot as plt
from matplotlib import gridspec
from matplotlib.ticker import NullFormatter, StrMethodFormatter, SymmetricalLogLocator
import numpy as np
from scipy.constants import golden_ratio
import scipy.interpolate
from gwpy.plot import Plot
from gwpy.segments import DataQualityFlag
from gwpy.timeseries import TimeSeries
from ligo.segments import segment, segmentlist
from . import utils
from . import calibration
# set plotting backend
plt.switch_backend("Agg")
matplotlib.rcParams.update(
{
"font.size": 9.0,
"axes.titlesize": 10.0,
"axes.labelsize": 8.0,
"xtick.labelsize": 8.0,
"ytick.labelsize": 8.0,
"legend.fontsize": 8.0,
"font.family": ["sans-serif"],
"font.sans-serif": [
"FreeSans",
"Helvetica Neue",
"Helvetica",
"Arial",
]
+ matplotlib.rcParams["font.sans-serif"],
"figure.dpi": 300,
"savefig.dpi": 300,
"text.usetex": False,
"path.simplify": True,
"agg.path.chunksize": 50000,
}
)
DEFAULT_COLORMAP = "copper_r"
COLORS = "b r g c m k".split()
LIGHT_COLORS = "tab:blue tab:red tab:green tab:cyan tab:pruple tab:grey".split()
DEFAULT_ALPHA = 0.75
DEFAULT_SHADE_ALPHA = 0.25
DEFAULT_FONTSIZE = 14
PGLITCH_LABEL = r"$p(\mathrm{glitch}|\mathrm{aux})$"
LOGLIKE_LABEL = r"$\ln \Lambda^\mathrm{glitch}_\mathrm{clean}$"
EFF_FAP_LABEL = "Efficiency/FAP"
FAP_LABEL = "FAP"
RANK_LABEL = "Rank"
QTRANSFORM_LABEL = "Frequency"
CUM_GLITCH_LIKE_LABEL = r"$P(\mathrm{aux}|\mathrm{glitch})$"
GLITCH_LIKE_LABEL = r"$p(\mathrm{aux}|\mathrm{glitch})$"
CUM_CLEAN_LIKE_LABEL = r"$P(\mathrm{aux}|\mathrm{clean})$"
CLEAN_LIKE_LABEL = r"$p(\mathrm{aux}|\mathrm{clean})$"
def color_generator():
return itertools.cycle(COLORS)
def light_color_generator():
return itertools.cycle(LIGHT_COLORS)
[docs]def histogram(nicknames, series, start, end, t0, legend=True, fig=None):
"""plot the series information in a standard histogram way"""
if fig is None:
fig = plt.figure()
ax_loglike = plt.subplot(2, 1, 1)
ax_fap = plt.subplot(2, 1, 2)
# plot
get_color = color_generator()
for nickname in nicknames:
color = next(get_color)
kwargs = dict(alpha=DEFAULT_ALPHA, color=color, label=nickname)
loglike_series = []
fap_series = []
for s in series[nickname]:
times = s.times
# check that the times are relevant for the actual requested stretch
truth = (start <= times) * (times < end)
if not np.any(truth):
continue
times = times[truth] - t0
loglike_series.extend(s.loglike.series[truth])
fap_series.extend(s.fap.series[truth])
fap_series = np.array(fap_series)
bins_fap = np.geomspace(np.min(fap_series[fap_series > 0]), 1, 100)
ax_fap.hist(fap_series, bins=bins_fap, **kwargs)
bins_loglike_pos = np.linspace(0, np.max(loglike_series), 100)
bin_width = np.max(loglike_series) / 100
num_neg_bins = np.ceil(-1 * np.min(loglike_series) / bin_width)
bins_loglike_neg = np.arange(-num_neg_bins, 0) * bin_width
bins_loglike = np.append(bins_loglike_neg, bins_loglike_pos)
ax_loglike.hist(loglike_series, bins=bins_loglike, **kwargs)
# remove label from kwargs
kwargs.pop("label", None)
# Decorate
ax_loglike.set_xlabel(f"{LOGLIKE_LABEL}")
ax_fap.set_xlabel(f"{FAP_LABEL}")
ax_loglike.set_ylabel("Counts")
ax_fap.set_ylabel("Counts")
ax_loglike.set_yscale("log")
ax_fap.set_yscale("log")
ax_fap.set_xscale("log")
ax_fap.set_xlim(1, min(fap_series[fap_series > 0]))
plt.subplots_adjust(hspace=0.60, wspace=0.10)
if legend:
plt.legend(loc="best")
return fig
[docs]def timeseries(
nicknames,
series,
start,
end,
t0,
legend=True,
gch_gps=None,
segs=None,
strain=None,
frange=(0, np.inf),
):
"""plot the series information in a standard way"""
if strain is not None:
fig = Plot(figsize=(8, 8))
gs = gridspec.GridSpec(3, 1, height_ratios=[2, 1, 1])
ax_q = fig.add_subplot(gs[0])
ax_loglike = fig.add_subplot(gs[1], sharex=ax_q)
ax_fap = fig.add_subplot(gs[2], sharex=ax_q)
else:
fig = Plot(figsize=(8, 8 / golden_ratio))
gs = gridspec.GridSpec(2, 1)
ax_loglike = fig.add_subplot(gs[0])
ax_fap = fig.add_subplot(gs[1], sharex=ax_loglike)
# plot
if strain is not None:
# create the q-transform
strain.times = strain.times - (t0 * units.s)
qscan = strain.q_transform(
qrange=(4, 64),
frange=frange,
tres=0.000208, # type: ignore
logf=True,
)
# plot this on top of other plots
ax_q.pcolormesh(qscan, vmin=0, vmax=25)
ax_q.set_yscale("log")
ax_q.set_xlim(start - t0, end - t0)
ax_q.set_ylabel("Frequency [Hz]")
ax_q.colorbar(label="Normalized energy", clim=(0, 25))
get_color = color_generator()
for nickname in nicknames:
color = next(get_color)
kwargs = dict(
alpha=DEFAULT_ALPHA,
color=color,
label=nickname,
drawstyle="steps-mid",
)
for s in series[nickname]:
times = s.times
# check that the times are relevant for the actual requested stretch
truth = (start <= times) * (times < end)
if not np.any(truth):
continue
times = times[truth] - t0
ax_loglike.plot(times, s.loglike.series[truth], **kwargs)
ax_fap.plot(times, s.fap.series[truth], **kwargs)
# remove label from kwargs
kwargs.pop("label", None)
# decorate
ax_loglike.set_ylabel(LOGLIKE_LABEL)
ax_fap.set_ylabel(FAP_LABEL)
# set ticks for y-axis to avoid missing ticks
ax_loglike.set_yscale("symlog")
locmin = SymmetricalLogLocator(
linthresh=1,
base=10,
subs=(0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9),
)
ax_loglike.yaxis.set_minor_locator(locmin)
ax_loglike.yaxis.set_minor_formatter(NullFormatter())
ax_fap.set_yscale("log")
xlim = (start - t0, end - t0)
ax_fap.set_xlim(xlim)
ax_loglike.set_xlim(xlim)
if legend:
ax_fap.legend(loc="best")
# annotate
ylims = dict((ax, ax.get_ylim()) for ax in [ax_loglike, ax_fap])
# add annotation gps
if gch_gps:
gch_gps = np.array(gch_gps)
gch_gps = gch_gps[
(start <= gch_gps) * (gch_gps < end)
] # filter by only the times that will show up
gch_gps = gch_gps - t0
verts = np.empty((2, len(gch_gps)), dtype=float)
gch_gps = [gch_gps] * 2
for ax, (ymin, ymax) in ylims.items():
verts[0, :] = ymin
verts[1, :] = ymax
ax.plot(
gch_gps,
verts,
color="grey",
linestyle="dashed",
alpha=DEFAULT_SHADE_ALPHA,
)
ax.set_ylim(ymin=ymin, ymax=ymax)
# shade the inactive segments
if segs is not None:
for s, e in segmentlist([segment(start, end)]) - segs:
for ax, (ymin, ymax) in ylims.items():
ax.fill_between(
[s - t0, e - t0],
[ymin] * 2,
[ymax] * 2,
color="grey",
alpha=DEFAULT_SHADE_ALPHA,
)
ax.set_ylim(ymin=ymin, ymax=ymax)
# add IDQ_OK segment plot at bottom
for nickname in nicknames:
idq_ok_flag = DataQualityFlag(name=f"IDQ_OK_{nickname}")
for s in series[nickname]:
ok_series = TimeSeries(s.ok.series, t0=s.t0 - t0, dt=s.dt)
idq_ok_flag |= (ok_series == 1).to_dqflag()
seg_ax = fig.add_segments_bar(idq_ok_flag, ax=ax_fap)
seg_ax.set_epoch(0)
time_unit = seg_ax.xaxis.get_transform().get_unit_name()
seg_ax.set_xlabel(f"Time [{time_unit}] relative to {t0:.3f}")
# precision issues can sometimes cause bad looking (and overlapping)
# ticks, round to 2 decimal places to mitigate this
seg_ax.xaxis.set_major_formatter(StrMethodFormatter("{x:,.2f}"))
# only keep outward-facing x-axis label
for ax in fig.axes:
try:
ax.label_outer()
except Exception:
pass
plt.subplots_adjust(hspace=0.1, wspace=0.1)
return fig
[docs]def dataset_corner(nicknames, datasets):
"""generate a corner plot showing the correlations between ranks"""
# get the set of all gps times present
times = set()
for nickname in nicknames:
times = times.union(set(datasets[nickname].times))
times = np.array(sorted(times), dtype=float)
# construct an array representing the data set
data = np.empty((len(nicknames), len(times)), dtype=float)
data[:] = np.nan
for j, nickname in enumerate(nicknames):
for v in datasets[nickname]:
data[j][v.time == times] = v.rank
# make a simple corner plot
return _corner(
np.transpose(data),
[nickname + " " + RANK_LABEL for nickname in nicknames],
linestyle="none",
marker=".",
)
def _corner(data, labels, color=None, linestyle="none", marker=".", fig=None):
"""construct a basic corner plot"""
if fig is None:
fig = plt.figure()
else:
plt.figure(fig.number)
Nsamp, N = data.shape
alpha = max(1.0 / Nsamp, 0.10)
ranges = []
for i in range(N):
m = np.min(data[:, i])
M = np.max(data[:, i])
d = 0.05 * (M - m)
ranges.append((m - d, M + d))
num_bins = max(10, int(Nsamp**0.5)) + 1
bins = [np.linspace(m, M, num_bins) for m, M in ranges]
for row in range(N):
for col in range(row + 1):
ax = plt.subplot(N, N, row * N + col + 1)
if col == row:
if color is None:
ax.hist(data[:, col], bins=bins[col], histtype="step")
else:
ax.hist(data[:, col], color=color, histtype="step")
else:
if color is None:
ax.plot(
data[:, col],
data[:, row],
linestyle=linestyle,
marker=marker,
alpha=alpha,
)
else:
ax.plot(
data[:, col],
data[:, row],
color=color,
linestyle=linestyle,
marker=marker,
alpha=alpha,
)
ax.set_ylim(ranges[row])
ax.set_xlim(ranges[col])
if (col == 0) and (row != 0):
ax.set_ylabel(labels[row])
else:
plt.setp(ax.get_yticklabels(), visible=False)
if row == N - 1:
ax.set_xlabel(labels[col])
else:
plt.setp(ax.get_xticklabels(), visible=False)
plt.subplots_adjust(hspace=0.05, wspace=0.05)
return fig
[docs]def roc(nicknames, series, datasets, fig=None, annotate_auc=False, rank_roc=False):
"""plot the ROC curves in a standard way"""
if fig is None:
fig = plt.figure()
ax = fig.gca()
get_color = color_generator()
xmin = 1e-3
for nickname in nicknames:
dataset = datasets[nickname]
# actually plot data
if len(dataset):
xmin = min(xmin, 0.1 / len(dataset))
# directly from ranks in evaluated datasets
color = next(get_color)
# plot based on loglike assigned by timeseries
if series is not None:
# update vectors to reflect the loglike they were assigned
dset = dataset.copy()
ranks = _evaluate_series_at_times(
series[nickname], dset.times, "loglike", default=0
)
dset.evaluate(ranks)
_dataset2roc(
ax,
dset,
color=color,
label=nickname + ": " + LOGLIKE_LABEL,
linestyle="solid",
alpha=DEFAULT_ALPHA,
shade_alpha=DEFAULT_SHADE_ALPHA,
marker=None,
annotate_auc=annotate_auc,
)
if series is None or rank_roc:
_dataset2roc(
ax,
dataset,
color=color,
label=nickname + ": " + RANK_LABEL,
linestyle="dashed",
alpha=DEFAULT_ALPHA,
shade_alpha=DEFAULT_SHADE_ALPHA,
marker=".",
)
# plot a fair coin's flip
line = np.logspace(np.log10(xmin), 0, 1001)
ax.plot(line, line, color="k", linestyle="dashed", alpha=DEFAULT_SHADE_ALPHA)
# decorate
ax.set_xlabel("FAP")
ax.set_ylabel("Efficiency")
ax.set_xscale("log")
ax.set_ylim(ymin=-0.01, ymax=1.01)
ax.set_xlim(xmin=xmin, xmax=1.01)
ax.legend(loc="lower right")
plt.subplots_adjust(hspace=0.10, wspace=0.10)
return fig
def _evaluate_series_at_times(series, times, key, default):
segments = segmentlist([segment(s.start, s.end) for s in series])
evaluated = default * np.ones_like(times)
# make mask for times within segments only
mask = utils.times_in_segments(times, segments)
# only evaluate times that are covered by the series
# use nearest-neighbor interpolation.
# otherwise, interpolation b/w 1 and lowest rank is incorrect.
eval_func = scipy.interpolate.interp1d(
np.concatenate([s.times for s in series]),
np.concatenate([getattr(s, key).series for s in series]),
kind="nearest",
)
evaluated[mask] = eval_func(times[mask])
return evaluated
def _dataset2roc(
ax,
dataset,
color=None,
label=None,
linestyle="solid",
alpha=DEFAULT_ALPHA,
shade_alpha=DEFAULT_SHADE_ALPHA,
marker="o",
annotate_auc=False,
):
"""helper function for generating an ROC curve from a dataset"""
ranks = np.array(sorted(set(dataset.ranks)))
gch, cln = dataset.vectors2classes()
# actually plot data
# directly from ranks in evaluated datasets
eff = np.zeros_like(ranks, dtype=float)
fap = np.zeros_like(ranks, dtype=float)
for r in gch.ranks:
eff += r >= ranks
eff /= len(gch)
seff = (eff * (1 - eff) / len(gch)) ** 0.5
for r in cln.ranks:
fap += r >= ranks
fap /= len(cln)
sfap = (fap * (1 - fap) / len(cln)) ** 0.5
# plot uncertainty regions
if color is not None:
ax.plot(
fap,
eff,
drawstyle="steps",
color=color,
marker=marker,
label=label,
linestyle=linestyle,
alpha=alpha,
)
if annotate_auc:
_annotate_roc_auc(ax, eff, fap)
else:
color = ax.plot(
fap,
eff,
drawstyle="steps",
marker=marker,
label=label,
linestyle=linestyle,
alpha=alpha,
)[0].get_color()
if annotate_auc:
_annotate_roc_auc(ax, eff, fap)
for e, se, f, sf, r in zip(eff, seff, fap, sfap, ranks):
ax.fill_between(
[f - sf, f + sf], [e - se] * 2, [e + se] * 2, color=color, alpha=shade_alpha
)
return color
[docs]def calibration_distribs(
nicknames, calibmaps, legend=True, fig=None, pdf_ymin=1e-4, pdf_ymax=1e4
):
"""plot the distributions over observed ranks and the calibration KDEs"""
if fig is None:
fig = plt.figure()
ax_cum_gch = plt.subplot(2, 2, 1)
ax_cum_cln = plt.subplot(2, 2, 2)
ax_gch = plt.subplot(2, 2, 3)
ax_cln = plt.subplot(2, 2, 4)
get_color = color_generator()
# plot histograms of ranks within dataset
num = np.sum([len(calibmaps[nickname]) for nickname in nicknames])
alpha = max(0.1, 1 / num)
shade_alpha = max(0.05, alpha / 2.0)
for nickname in nicknames:
kwargs = dict(alpha=alpha, label=nickname, color=next(get_color))
fill_between_kwargs = dict(alpha=shade_alpha, color=kwargs["color"])
# plot distributions stored within calibration maps
for calibmap in calibmaps[nickname].values():
for ax_cum, ax, kde in [
(ax_cum_cln, ax_cln, calibmap._cln_kde),
(ax_cum_gch, ax_gch, calibmap._gch_kde),
]:
# control the number and placement of bins by hands
# so we don't end up with enormous things
ranks = kde.ranks
# extract data
# cumulative distributions
cdf = 1.0 - kde.cdf(ranks)
cdf_low = 1.0 - kde.cdf_quantile(ranks, 0.16)
cdf_high = 1.0 - kde.cdf_quantile(ranks, 0.84)
# differential distributions
pdf = kde.pdf(ranks)
pdf_low = kde.pdf_quantile(ranks, 0.16)
pdf_high = kde.pdf_quantile(ranks, 0.84)
# actually plot
if isinstance(kde, calibration.FixedBandwidth1DKDE):
# continuous case
ax_cum.plot(ranks, cdf, **kwargs)
# remove this so we do not repeatedly label
kwargs.pop("label", None)
ax_cum.hist(
kde.obs,
bins=ranks,
histtype="step",
density=True,
cumulative=-1,
linestyle="dashed",
**kwargs,
)
ax_cum.fill_between(ranks, cdf_low, cdf_high, **fill_between_kwargs)
ax.plot(ranks, pdf, **kwargs)
ax.fill_between(ranks, pdf_low, pdf_high, **fill_between_kwargs)
else:
# discrete case
ax_cum.step(ranks, cdf, where="post", marker=".", **kwargs)
kwargs.pop("label", None)
ax_cum.hist(
kde.obs,
bins=[0] + list(ranks) + [1],
histtype="step",
density=True,
cumulative=-1,
linestyle="dashed",
**kwargs,
)
for r, R, l, h in zip(
ranks[:-1], ranks[1:], cdf_low[:-1], cdf_high[:-1]
):
ax_cum.fill_between(
[r, R], [l] * 2, [h] * 2, **fill_between_kwargs
)
if len(ranks):
ax_cum.fill_between(
[ranks[-1], 1],
[cdf_low[-1]] * 2,
[cdf_high[-1]] * 2,
**fill_between_kwargs,
)
ax.plot(ranks, pdf, linestyle="none", marker=".", **kwargs)
for r, l, h in zip(ranks, pdf_low, pdf_high):
ax.plot([r] * 2, [l, h], **fill_between_kwargs)
# decorate
ax_cum_gch.set_ylabel(CUM_GLITCH_LIKE_LABEL)
ax_cum_cln.set_ylabel(CUM_CLEAN_LIKE_LABEL)
ax_gch.set_ylabel(GLITCH_LIKE_LABEL)
ax_cln.set_ylabel(CLEAN_LIKE_LABEL)
xlim = (0.0, 1.0)
ylim = (0.0, 1.0)
for ax in [ax_cum_gch, ax_cum_cln]:
plt.setp(ax.get_xticklabels(), visible=False)
ax.set_xlim(xlim)
ax.set_ylim(ylim)
for ax in [ax_gch, ax_cln]:
ax.set_xlabel(RANK_LABEL)
ax.set_xlim(xlim)
ax.set_yscale("log")
ymin, ymax = ax.get_ylim()
ymin = max(pdf_ymin, ymin) if pdf_ymin > 0 else ymin
ymax = min(pdf_ymax, ymax) if pdf_ymax > 0 else ymax
ax.set_ylim(ymin=ymin, ymax=ymax)
for ax in [ax_cum_cln, ax_cln]:
ax.yaxis.tick_right()
ax.yaxis.set_label_position("right")
if legend:
ax_cum_cln.legend(loc="best")
plt.subplots_adjust(hspace=0.05, wspace=0.05)
return fig
def _annotate_roc_auc(ax, eff, fap):
areaUC = np.trapz(eff, fap) * -1 # integrated on [1,0], so reverse
ax.fill_between(
fap,
eff,
facecolor="lightblue",
step="pre",
alpha=0.7,
label=f"AUC = {areaUC:1.4f}",
)
def _series2coverage(series, dataset, key):
num = len(dataset)
if num:
# there are samples to use
evaluated = _evaluate_series_at_times(series, dataset.times, key, default=1)
vals, counts = np.unique(evaluated, return_counts=True)
nom, frc = np.transpose(
np.array([(val, count) for val, count in zip(vals, counts)], dtype=float)
)
order = nom.argsort()
nom = nom[order]
frc = np.cumsum(frc[order]) / num
return nom, frc, (frc * (1 - frc) / num) ** 0.5
else:
return (
np.array([], dtype=float),
np.array([], dtype=float),
np.array([], dtype=float),
)
def _map2coverage(calibmaps, key):
data = []
for calibmap in calibmaps.values():
if key == "fap":
this_ranks = calibmap._cln_dataset.ranks
this_data = calibmap.fap(this_ranks)
elif key == "eff":
this_ranks = calibmap._gch_dataset.ranks
this_data = calibmap.eff(this_ranks)
data.append(this_data)
data = np.concatenate(data)
num = len(data)
if num:
vals, counts = np.unique(data, return_counts=True)
nom, frc = np.transpose(
np.array([(val, count) for val, count in zip(vals, counts)], dtype=float)
)
order = nom.argsort()
nom = nom[order]
frc = np.cumsum(frc[order]) / num
return nom, frc, (frc * (1 - frc) / num) ** 0.5
else:
return (
np.array([], dtype=float),
np.array([], dtype=float),
np.array([], dtype=float),
)
[docs]def calibration_coverage(
nicknames,
series,
datasets,
calibmaps,
legend=True,
fig=None,
plot_map_coverage=False,
):
"""plot the calibration coverage in a standard way"""
if fig is None:
fig = plt.figure()
ax_eff = plt.subplot(1, 2, 1)
ax_fap = plt.subplot(1, 2, 2)
get_color = color_generator()
get_color_light = light_color_generator()
# prepare data
min_fap = 1e-3
for nickname in nicknames:
gch, cln = datasets[nickname].vectors2classes()
color = next(get_color)
# plot efficiency
if plot_map_coverage:
light_color = next(get_color_light)
obs_eff_map, frac_eff_map, s_eff_map = _map2coverage(
calibmaps[nickname], "eff"
)
ax_eff.plot(
obs_eff_map,
frac_eff_map,
linestyle="None",
marker="+",
color=light_color,
alpha=DEFAULT_ALPHA,
label=nickname,
)[0].get_color()
obs_eff, frac_eff, s_eff = _series2coverage(series[nickname], gch, "eff")
ax_eff.plot(
obs_eff,
frac_eff,
linestyle="None",
marker=".",
color=color,
alpha=DEFAULT_ALPHA,
label=nickname,
)[0].get_color()
for x, y, dy in zip(obs_eff, frac_eff, s_eff):
ax_eff.plot(
[x] * 2, [y - dy, y + dy], color=color, alpha=DEFAULT_SHADE_ALPHA
)
# plot fap
if plot_map_coverage:
obs_fap_map, frac_fap_map, s_fap_map = _map2coverage(
calibmaps[nickname], "fap"
)
ax_fap.plot(
obs_fap_map,
frac_fap_map,
linestyle="None",
marker="+",
color=light_color,
alpha=DEFAULT_ALPHA,
label=nickname,
)
obs_fap, frac_fap, s_fap = _series2coverage(series[nickname], cln, "fap")
ax_fap.plot(
obs_fap,
frac_fap,
linestyle="None",
marker=".",
color=color,
alpha=DEFAULT_ALPHA,
label=nickname,
)
for x, y, dy in zip(obs_fap, frac_fap, s_fap):
ax_fap.plot(
[x] * 2, [y - dy, y + dy], color=color, alpha=DEFAULT_SHADE_ALPHA
)
if np.any(obs_fap > 0):
min_fap = min(np.min(obs_fap[obs_fap > 0]), min_fap)
# plot reference lines
x = np.logspace(np.log10(min_fap), 0, 1001)
ax_eff.plot(x, x, color="k", alpha=DEFAULT_SHADE_ALPHA, linestyle="dashed")
ax_fap.plot(x, x, color="k", alpha=DEFAULT_SHADE_ALPHA, linestyle="dashed")
# decorate
if legend:
ax_fap.legend(loc="best")
ax_eff.set_xlabel("Nominal Efficiency")
ax_eff.set_ylabel("Observed Fraction")
ax_fap.set_xlabel("Nominal FAP")
ax_fap.set_ylabel("Observed Fraction")
ax_fap.yaxis.tick_right()
ax_fap.yaxis.set_label_position("right")
lim = (0.0, 1.0)
ax_eff.set_xlim(lim)
ax_eff.set_ylim(lim)
lim = (min_fap, 1)
ax_fap.set_xlim(lim)
ax_fap.set_ylim(lim)
ax_fap.set_xscale("log")
ax_fap.set_yscale("log")
plt.subplots_adjust(hspace=0.05, wspace=0.05)
return fig
def _fap_error_bounds(obs_fap, frac_fap, frac_fap_err):
obs_fap_err = (obs_fap * (1 - obs_fap) / len(obs_fap)) ** 0.5
dfap = (obs_fap - frac_fap) / frac_fap
# error propagation for subtraction in dfap
# leave out square root b/c squared in dfap_err
dfap_err_sub = np.power(obs_fap_err, 2) + np.power(frac_fap_err, 2)
# total error from subtraction and division
with warnings.catch_warnings():
warnings.filterwarnings("ignore", category=RuntimeWarning)
dfap_err_base = (dfap_err_sub / np.power(obs_fap - frac_fap, 2)) + np.power(
frac_fap_err / frac_fap, 2
)
dfap_err = np.power(dfap_err_base, 0.5) * dfap
# calculate lower/upper bounds
dfap_plus_error = (dfap + dfap_err) * 100
dfap_minus_error = (dfap - dfap_err) * 100
dfap_upper = np.maximum(dfap_plus_error, dfap_minus_error)
dfap_lower = np.minimum(dfap_plus_error, dfap_minus_error)
return dfap, dfap_upper, dfap_lower
[docs]def calibration_accuracy(
nicknames,
calibmaps,
series,
datasets,
legend=True,
fig=None,
plot_map_coverage=False,
):
"""plot the calibration accuracy"""
if fig is None:
fig = plt.figure()
get_color = color_generator()
get_color_light = light_color_generator()
min_fap = 1e-3
for nickname in nicknames:
color = next(get_color)
gch, cln = datasets[nickname].vectors2classes()
# calculate expected/observed FAP
obs_fap, frac_fap, frac_fap_err = _series2coverage(series[nickname], cln, "fap")
dfap, dfap_upper, dfap_lower = _fap_error_bounds(
obs_fap, frac_fap, frac_fap_err
)
min_fap = min(np.min(obs_fap[obs_fap > 0]), min_fap)
obs_fap_health = obs_fap
# same but for the calibration map
if plot_map_coverage:
obs_fap_map, frac_fap_map, frac_fap_err_map = _map2coverage(
calibmaps[nickname], "fap"
)
dfap_map, dfap_upper_map, dfap_lower_map = _fap_error_bounds(
obs_fap_map, frac_fap_map, frac_fap_err_map
)
obs_fap_health = obs_fap_map
min_fap = min(np.min(obs_fap_map[obs_fap_map > 0]), min_fap)
# plot health bands
size = obs_fap_health.size
health_colors = ["#cc3300", "#ffcc00", "#339900"]
thresholds = [200, 100, 50]
for health_color, threshold in zip(health_colors, thresholds):
plt.fill_between(
obs_fap_health,
np.full(size, threshold),
np.full(size, -threshold),
hatch="XX",
facecolor="none",
edgecolor=health_color,
)
# plot FAP + error bounds
plt.plot(
obs_fap,
dfap * 100,
color=color,
alpha=DEFAULT_ALPHA,
label=f"{nickname}-series",
)
plt.fill_between(
obs_fap,
dfap_upper,
dfap_lower,
color=color,
alpha=DEFAULT_SHADE_ALPHA,
)
if plot_map_coverage:
light_color = next(get_color_light)
plt.plot(
obs_fap_map,
dfap_map * 100,
color=light_color,
alpha=DEFAULT_ALPHA,
label=f"{nickname}-map",
)
plt.fill_between(
obs_fap_map,
dfap_upper_map,
dfap_lower_map,
color=light_color,
alpha=DEFAULT_SHADE_ALPHA,
)
plt.gca().invert_xaxis()
plt.xlim(1, min_fap)
plt.ylim(-200, 200)
plt.xscale("log")
plt.ylabel("dFAP/FAP (% error)")
plt.xlabel("Nominal FAP")
plt.legend()
return fig
[docs]def featureimportance(
nicknames,
models,
classifiers,
start,
end,
datasets=None,
t0=None,
segdict=None,
skip_plot=False,
**kwargs,
):
"""delegate to models heavily here"""
seg = segmentlist([segment(start, end)])
# limit segs to cover only series if available
if segdict is not None:
all_segs = segmentlist(
[
segs
for nickname in nicknames
for model in models[nickname].values()
for segs in segdict[nickname][model.model_id]
]
)
all_segs.coalesce()
all_segs &= seg
if t0 is None:
t0 = start
figs = dict()
tabs = dict()
for nickname in nicknames:
if datasets:
working_dataset = datasets[nickname].copy()
if segdict is not None:
working_dataset.filter(all_segs)
else:
working_dataset = None
f = dict()
t = dict()
classifier = classifiers[nickname]
for key, model in models[nickname].items():
classifier.model = model
# only show feature importances over time in which model was evaluated for
if segdict is not None:
intersection = seg & segdict[nickname][model.model_id]
if utils.livetime(intersection) == 0:
raise ValueError(
"no overlap between target segment and segdict for "
+ model.model_id
)
start, end = intersection.extent()
# make a figure
if not skip_plot:
try:
fig = classifier.feature_importance_figure(
working_dataset, start, end, t0, **kwargs
)
fig.suptitle(
"%s Feature Importance within [%.3f, %.3f)"
% (nickname, start, end)
)
f[key] = fig
except NotImplementedError:
f[key] = None
else:
f[key] = None
# make a table
try:
dataset = working_dataset if not skip_plot else None
t[key] = classifier.feature_importance_table(dataset, **kwargs)
except NotImplementedError:
t[key] = None
figs[nickname] = f
tabs[nickname] = t
return figs, tabs
[docs]def save(path, fig):
"""
save figure to disk and close
"""
fig.savefig(path)
plt.close(fig)