Skip to content

validate

find_matching_events

find_matching_events(target, reference, coincidence_window)

Find events which are time coincident.

Parameters:

Name Type Description Default
target EventTable

The target events to validate against.

required
reference EventTable

The reference events for validation.

required
coincidence_window float

The maximum time spacing to consider an event to be in coindicence. Default is 0.25 seconds.

required

Returns:

Type Description
EventTable, EventTable

The found and missed events, respectively.

Source code in gvt/validate.py
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
def find_matching_events(
    target: EventTable, reference: EventTable, coincidence_window: float
) -> tuple[EventTable, EventTable]:
    """Find events which are time coincident.

    Parameters
    ----------
    target : EventTable
        The target events to validate against.
    reference : EventTable
        The reference events for validation.
    coincidence_window : float
        The maximum time spacing to consider an
        event to be in coindicence. Default is 0.25 seconds.

    Returns
    -------
    (EventTable, EventTable)
        The found and missed events, respectively.

    """
    valid_types = {EventType.GSPY.value, EventType.INJECTIONS.value}
    if get_metadata(reference, "type") not in valid_types:
        raise ValueError("only gspy events or injections are allowed as a reference")

    # find matches
    events = join(
        reference,
        target,
        keys="time",
        table_names=["ref", "target"],
        join_type="inner",
        join_funcs={"time": join_distance(coincidence_window)},
        metadata_conflicts="silent",
    )

    # keep the most significant event in each match
    events = events.cluster("time_id", "snr_target", 0.1)

    # replace metadata with join-specific metadata
    tables_by_name = {"target": target, "reference": reference}
    copy_metadata(events, tables_by_name, "type")
    copy_metadata(events, tables_by_name, "significance")

    # find missed events
    all_events = reference.copy()
    all_events.rename_column("time", "time_ref")
    missed = setdiff(all_events, events, keys="time_ref")

    return events, missed

summarize_found_by_category

summarize_found_by_category(found, reference)

Summarize the number of events found by label.

Parameters:

Name Type Description Default
found EventTable

The events that were found.

required
reference EventTable

The reference events for validation.

required

Returns:

Type Description
EventTable

A summary of missed/found events by label.

Source code in gvt/validate.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
def summarize_found_by_category(found: EventTable, reference: EventTable) -> EventTable:
    """Summarize the number of events found by label.

    Parameters
    ----------
    found : EventTable
        The events that were found.
    reference : EventTable
        The reference events for validation.

    Returns
    -------
    EventTable
        A summary of missed/found events by label.

    """
    if get_metadata(reference, "type") != EventType.GSPY.value:
        raise ValueError("only gspy events are allowed as a reference")

    # group matching events by category
    categories = reference.group_by("ml_label")
    found_by_group = found.group_by("ml_label")

    # create summary by category
    summary = EventTable(names=("label", "found", "total"), dtype=("S20", "i4", "i4"))
    copy_metadata(summary, reference, "type")
    for key, group in zip(categories.groups.keys, categories.groups):
        label = key["ml_label"]
        mask = found_by_group.groups.keys["ml_label"] == label
        summary.add_row((label, len(found_by_group.groups[mask]), len(group)))

    return summary

validate_gspy_events

validate_gspy_events(target, reference, path, coincidence_window=constants.COINCIDENCE_WINDOW, duration=constants.EVENT_DURATION, limit=10, filters=None)

Validate a set of events against a reference.

Parameters:

Name Type Description Default
target EventTable

The target events to validate against.

required
reference EventTable

The reference events for validation.

required
path Path

The directory to write this plot to.

required
coincidence_window float

The maximum time spacing to consider an event to be in coindicence. Default is 0.25 seconds.

constants.COINCIDENCE_WINDOW
duration float

The duration in seconds to plot around events. Default is 8 seconds.

constants.EVENT_DURATION
limit int

The maximum number of missed events to plot for each category. Default is 10.

10
filters list[str]

The list of column filters to apply to target events prior to matching, e.g. "snr >= 8".

None
Source code in gvt/validate.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def validate_gspy_events(
    target: EventTable,
    reference: EventTable,
    path: Path,
    coincidence_window: float = constants.COINCIDENCE_WINDOW,
    duration: float = constants.EVENT_DURATION,
    limit: int = 10,
    filters: Optional[list[str]] = None,
) -> None:
    """Validate a set of events against a reference.

    Parameters
    ----------
    target : EventTable
        The target events to validate against.
    reference : EventTable
        The reference events for validation.
    path : Path
        The directory to write this plot to.
    coincidence_window : float
        The maximum time spacing to consider an
        event to be in coindicence. Default is 0.25 seconds.
    duration : float
        The duration in seconds to plot around events.
        Default is 8 seconds.
    limit : int
        The maximum number of missed events to plot for
        each category. Default is 10.
    filters : list[str]
        The list of column filters to apply to target events
        prior to matching, e.g. "snr >= 8".

    """
    if get_metadata(reference, "type") != EventType.GSPY.value:
        raise ValueError("only gspy events are allowed as a reference")

    if filters:
        events = target.filter(filters)
    else:
        events = target

    # match and summarize
    found, missed = find_matching_events(events, reference, coincidence_window)
    summary = summarize_found_by_category(found, reference)

    # generate per-label and performance plots
    plots.missed_found_by_label(path, summary)
    plots.roc_curve(path, found, summary)

    # follow up on missed events
    if not missed:
        return

    channel = get_metadata(reference, "channel")
    missed_labels = missed.group_by("ml_label")
    for key, events_by_label in zip(missed_labels.groups.keys, missed_labels.groups):
        label = key["ml_label"]
        if label == "No_Glitch":
            continue
        for event in events_by_label[:limit]:
            t0 = event["time_ref"]
            dt = duration / 2
            series = TimeSeries.get(channel, t0 - dt, t0 + dt)
            match EventType[get_metadata(target, "type").upper()]:
                case EventType.SNAX:
                    plots.qscan_timeseries(path, t0, duration, series, target)
                case EventType.OMICRON:
                    pass
                case _:
                    pass

validate_injections

validate_injections(target, reference, path, coincidence_window=constants.COINCIDENCE_WINDOW, snr_threshold=constants.SNR_THRESHOLD, duration=constants.EVENT_DURATION, limit=100, filters=None)

Validate a set of events against a reference.

Parameters:

Name Type Description Default
target EventTable

The target events to validate against.

required
reference EventTable

The reference events for validation.

required
path Path

The directory to write this plot to.

required
coincidence_window float

The maximum time spacing to consider an event to be in coindicence. Default is 0.25 seconds.

constants.COINCIDENCE_WINDOW
snr_threshold float

The SNR threshold. Default is 5.5.

constants.SNR_THRESHOLD
duration float

The duration in seconds to plot around events. Default is 8 seconds.

constants.EVENT_DURATION
limit int

The maximum number of missed events to follow up. Default is 100.

100
filters list[str]

The list of column filters to apply to target events prior to matching, e.g. "snr >= 8".

None
Source code in gvt/validate.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def validate_injections(
    target: EventTable,
    reference: EventTable,
    path: Path,
    coincidence_window: float = constants.COINCIDENCE_WINDOW,
    snr_threshold: float = constants.SNR_THRESHOLD,
    duration: float = constants.EVENT_DURATION,
    limit: int = 100,
    filters: Optional[list[str]] = None,
) -> None:
    """Validate a set of events against a reference.

    Parameters
    ----------
    target : EventTable
        The target events to validate against.
    reference : EventTable
        The reference events for validation.
    path : Path
        The directory to write this plot to.
    coincidence_window : float
        The maximum time spacing to consider an
        event to be in coindicence. Default is 0.25 seconds.
    snr_threshold : float
        The SNR threshold. Default is 5.5.
    duration : float
        The duration in seconds to plot around events.
        Default is 8 seconds.
    limit : int
        The maximum number of missed events to follow up.
        Default is 100.
    filters : list[str]
        The list of column filters to apply to target events
        prior to matching, e.g. "snr >= 8".

    """
    if get_metadata(reference, "type") != EventType.INJECTIONS.value:
        raise ValueError("only injection events are allowed as a reference")

    if filters:
        events = target.filter(filters)
    else:
        events = target

    # find coincidences
    found, missed = find_matching_events(events, reference, coincidence_window)

    # generate summary plots
    plots.injected_vs_detected_snr(path, found, snr_threshold)
    plots.injected_vs_detected_frequency(path, found)
    plots.injection_snr_mismatch(path, found, snr_threshold)
    plots.injection_frequency_mismatch(path, found)
    plots.time_difference_histogram(path, found, coincidence_window)
    plots.missed_found_injections(path, found, missed)

    # follow up on missed events
    if not missed:
        return

    channel = get_metadata(reference, "channel")
    for event in missed[:limit]:
        t0 = event["time_ref"]
        dt = duration / 2
        series = TimeSeries.get(channel, t0 - dt, t0 + dt)
        match EventType[get_metadata(target, "type").upper()]:
            case EventType.SNAX:
                plots.qscan_timeseries(path, t0, duration, series, target)
            case EventType.OMICRON:
                pass
            case _:
                pass