on_alert

on_alert(tag, kafka_server, gracedb_server, gracedb_search, gracedb_submitter, max_wait_time, injection_channel)

Bases: object

Listener to receive igwn-alerts and produce output messages to Kafka.

Parameters:
  • tag

    unique identifier to be used in the Kafka broker name and output topic names.

  • kafka_server

    server url that Kafka is hosted on.

  • gracedb_server

    name of the GraceDB client to receive igwn-alerts from. gracedb, gracedb-playground, or gracedb-test.

  • gracedb_search

    tag to identify which search activity to process events from. eg AllSky, EarlyWarning, etc.

  • gracedb_submitter

    GraceDb submitter to process events from.

  • max_wait_time

    maximum amount of time to keep events in memory before dropping them.

  • injection_channel

    Name of the strain channels from which to process injections.

Source code in gw/lts/igwn_alert_listener.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def __init__(self, tag, kafka_server,
             gracedb_server, gracedb_search, gracedb_submitter,
             max_wait_time, injection_channel):
    self.tag = tag
    self.kafka_server = kafka_server
    self.gracedb_client = GraceDbHelper(gracedb_server)
    self.search = gracedb_search
    self.submitter = gracedb_submitter
    self.max_wait_time = max_wait_time
    self.inj_channels = set(list(injection_channel))

    # set up producer
    self.client = kafka.Client(f"kafka://{self.tag}@{self.kafka_server}")

    self.events = OrderedDict()
    self.events_sent = deque(maxlen=300)

    logging.info("Initialized on_alert class.")

add_coinc

add_coinc(uid, output={})

Download coinc file object from GraceDB, parse the file for relevant info, and add data to the output dict.

Parameters:
  • uid

    event grace id to process

  • output

    event data to update. Optional, default is an empty dict.

Returns:
  • dict, output data to send to Kafka
Source code in gw/lts/igwn_alert_listener.py
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def add_coinc(self, uid, output={}):
    """
    Download coinc file object from GraceDB,
    parse the file for relevant info, and add
    data to the output dict.

    Parameters
    ----------
    uid (str)
        event grace id to process
    output (dict)
        event data to update. Optional,
        default is an empty dict.

    Returns
    ----------
        dict, output data to send to Kafka
    """
    coinc = self.get_filename(uid, "coinc.xml")
    if coinc:
        try:
            xmldoc = utils.load_xml(coinc)
            coinctable = lsctables.CoincInspiralTable.get_table(xmldoc)
        except ElementError as error:
            logging.warning(
                f"Failed to parse coinc file from {uid}. Error: {error}"
            )
        else:
            coinc_msg = io.BytesIO()
            ligolw_utils.write_fileobj(xmldoc, coinc_msg)

            output.update(
                {
                    "time": coinctable[0].end_time,
                    "time_ns": coinctable[0].end_time_ns,
                    "snr": coinctable[0].snr,
                    "far": coinctable[0].combined_far,
                    "coinc": coinc_msg.getvalue().decode(),
                }
            )
            logging.debug(f"Added coinc.xml to {uid}")
    return output

get_channels

get_channels(uid)

Download coinc file object from a gracedb event and parse it for the strain channel name(s).

Parameters:
  • uid

    event grace id to process

Returns:
  • channels(set)

    set of channel names, or None

Source code in gw/lts/igwn_alert_listener.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
def get_channels(self, uid):
    """
    Download coinc file object from a gracedb
    event and parse it for the strain channel
    name(s).

    Parameters
    ----------
    uid (str)
        event grace id to process

    Returns
    ----------
    channels (set)
        set of channel names, or None
    """
    channels = None
    coinc = self.get_filename(uid, "coinc.xml")
    if coinc:
        try:
            xmldoc = utils.load_xml(coinc)
            sngltable = lsctables.SnglInspiralTable.get_table(xmldoc)
            channels = set(list(sngltable.getColumnByName("channel")))
        except ElementError as error:
            logging.warning(
                f"Failed to parse coinc file from {uid}. Error: {error}"
            )
        return channels
    return None

get_filename

get_filename(uid, filename, retries=10)

Download file from Gracedb.

Parameters:
  • uid

    event grace id to process

  • filename

    name of file to download

  • retries

    number of times to attempt download, default is 10.

Returns:
  • file(Response)

    requests.models.Response object or None

Source code in gw/lts/igwn_alert_listener.py
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def get_filename(self, uid, filename, retries=10):
    """
    Download file from Gracedb.

    Parameters
    ----------
    uid (str)
        event grace id to process
    filename (str)
        name of file to download
    retries (int)
        number of times to attempt
        download, default is 10.

    Returns
    ----------
    file (requests.models.Response)
        requests.models.Response object or None
    """
    this_try = 0
    while this_try < retries:
        file = self.gracedb_client.query_file(uid, filename)
        if file:
            return file
        else:
            this_try += 1
    logging.debug(f"Failed to download {filename} from {uid}.")
    return None

process_alert

process_alert(topic=None, payload=None)

Processes alerts received from igwn-alert.

Parse the alert payload and process alerts associated with new or updated events only from the channel names, submitter, and search specified.

Parameters:
  • topic

    topic from which alert was received

  • payload

    alert payload

Source code in gw/lts/igwn_alert_listener.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
def process_alert(self, topic=None, payload=None):
    """
    Processes alerts received from igwn-alert.

    Parse the alert payload and process alerts
    associated with new or updated events only
    from the channel names, submitter, and search
    specified.

    Parameters
    ----------
    topic (str)
        topic from which alert was received
    payload (str)
        alert payload
    """
    # unpack alert payload
    payload = json.loads(payload)
    id = payload["uid"]
    alert_type = payload["alert_type"]
    data = payload["data"]

    # only need to process new or update type alerts
    if alert_type not in ("new", "update"):
        logging.info(f"Received {alert_type}, skipping")
        return

    # first get the event uid and event level data
    # we have to do this slightly differently for
    # superevent alerts vs event alerts
    if id.startswith("S"):
        uid = data["preferred_event"]
        event_data = data["preferred_event_data"]
        datasource = "superevents"

        logging.info(
            f"Received {alert_type} alert for {id} " +
            f"from {datasource}, preferred event: {uid}"
        )
    else:
        uid = id
        event_data = data
        datasource = data["pipeline"]

        logging.info(f"Received {alert_type} alert for {uid}" +
                     f" from {datasource}")

    # now that we have the uid for a specific event,
    # check the channels this event comes from. skip
    # if not from injection channels
    channels = self.get_channels(uid)
    if not channels or not channels.issubset(self.inj_channels):
        logging.debug(f"{uid} not from injection channels, skipping.")
        return

    # filter events by search
    search = event_data["search"]
    if not event_data["search"] == self.search:
        logging.info(f"Skipping {search} event...")

    # filter events by submitter if provided
    submitter = event_data["submitter"]
    if self.submitter and not submitter == self.submitter:
        logging.info(f"skipping event {uid} submitted by {submitter}")
        return

    # get event object, this has some info not
    # included in the alert payload
    event = self.gracedb_client.get_event(uid=uid)

    # construct event data to be sent in kafka messages
    if uid in self.events.keys():
        self.events[uid] = (self.process_event(
            uid, event, output=self.events[uid])
        )
    else:
        self.events[uid] = self.process_event(uid, event)

    # check if all elements present, then send msg
    # only send msg once per event
    topic = f"{datasource}.{self.tag}.testsuite.inj_events"
    for uid, data in self.events.items():

        if all(data.values()) and uid not in self.events_sent:
            logging.info(
                f'sending a message for {uid} (coa time: {data["time"]}).'
            )
            self.client.write(topic, data)
            self.events_sent.append(uid)

    # clean out old events that already had a msg sent
    time_now = float(GPSTimeNow())
    for key, value in list(self.events.items()):
        if time_now - value["time_added"] >= self.max_wait_time:
            logging.debug(f"Removing old event: {key}")
            self.events.pop(key)

process_event

process_event(uid, event, output={})
Parameters:
  • uid

    event grace id to process

  • event

    gracedb event object

  • output

    event data to update. Optional, default is an empty dict.

Returns:
  • dict, output data to send to Kafka

    output = { "time": coinc end time, "time_ns": coinc end time ns, "snr": network snr, "far": coinc combined far, "coinc": coinc file object, "latency": gracedb upload latency, "pipeline": event pipeline, "uid": event grace id, "time_added": time event processed, "snr_optimized": whether the event is SNR_OPTIMIZED, }

Source code in gw/lts/igwn_alert_listener.py
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
def process_event(self, uid, event, output={}):
    """
    Parameters
    ----------
    uid (str)
        event grace id to process
    event (json)
        gracedb event object
    output (dict)
        event data to update. Optional,
        default is an empty dict.

    Returns
    ----------
        dict, output data to send to Kafka
        output = {
            "time": coinc end time,
            "time_ns": coinc end time ns,
            "snr": network snr,
            "far": coinc combined far,
            "coinc": coinc file object,
            "latency": gracedb upload latency,
            "pipeline": event pipeline,
            "uid": event grace id,
            "time_added": time event processed,
            "snr_optimized": whether the event is SNR_OPTIMIZED,
        }
    """
    required_params = (
        "time", "time_ns", "snr", "far",
        "coinc", "latency", "pipeline")
    if not output:
        # initialize all the items we need in order to send a message
        for k in required_params:
            output.update({k: None})
        output.update({"time_added": float(GPSTimeNow())})

    output.update(
        {
            "uid": uid,
            "latency": event["reporting_latency"],
            "pipeline": event["pipeline"],
        }
    )

    if "SNR_OPTIMIZED" in event["labels"]:
        output.update({"snr_optimized": "True"})
    else:
        output.update({"snr_optimized": "False"})

    output.update(self.add_coinc(uid))

    return output