InspInjMsgFind

InspInjMsgFind(tag, kafka_server, input_topic, preferred_param='ifar', timeout=600.0, verbose=False)

Bases: object

Job service for ingesting event and injection messages from Kafka. Incoming messages are paired by event time and then bundled into an output message sent to Kafka.

Parameters:
  • tag

    unique identifier to be used in the Kafka broker name and output topic names.

  • kafka_server

    server url that Kafka is hosted on.

  • input_topic

    Kafka topics to subscribe to.

  • preferred_param

    ifar (default) or snr. Parameter used to choose one event when multiple are received for a given injection.

  • verbose

    be verbose.

Source code in gw/lts/inspinjmsg_find.py
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
def __init__(self, tag, kafka_server, input_topic,
             preferred_param="ifar", timeout=600., verbose=False):
    self.tag = tag
    self.kafka_server = kafka_server
    self.topics = input_topic
    self.preferred_param = preferred_param
    self.timeout = timeout
    self.verbose = verbose

    # initialize data deques
    self.maxlen = 1000
    self.event_msgs = defaultdict(lambda: deque(maxlen=self.maxlen))
    self.inj_msgs = defaultdict(lambda: deque(maxlen=self.maxlen))

    # set up producer
    self.client = kafka.Client(f"kafka://{self.tag}@{self.kafka_server}")

    # create a job service using cronut
    self.app = App(
        "inspinjmsg_find",
        broker=f"kafka://{self.tag}_inspinjmsg_find@{self.kafka_server}",
    )

    @self.app.process(self.topics)
    def process(message):
        """
        Process incoming messages.

        Parameters
        ----------
        message (str)
            message payload
        """
        mdatasource, mtag, mtopic = utils.parse_msg_topic(message)

        # unpack data from the message
        if mtopic == "inj_events":
            # parse event info
            event = json.loads(message.value())

            # load the coinc table and
            # get event coalescence time
            coinc_file = utils.load_xml(event["coinc"])
            coinctable = lsctables.CoincInspiralTable.get_table(coinc_file)
            coincrow = coinctable[0]
            coinctime = (coincrow.end_time +
                         coincrow.end_time_ns * 10.0**-9.0)

            # keep track of the preferred parameter
            # for this event
            val = self.get_preferred_param(coinctable)

            dict = {
                "time": coinctime,
                "coinc": coinc_file,
                "msg_time": int(GPSTimeNow()),
                "preferred_param": val,
            }

            snr_optimized = event["snr_optimized"]
            far = coincrow.combined_far
            snr = event["snr"]
            uid = event["uid"]
            logging.info(
                f"received {mdatasource} event with coalescence time: " +
                f"{coinctime} and {self.preferred_param} = {val}"
            )

            logging.debug(
                f"ID: {uid} Tag: {snr_optimized} " +
                f"with FAR: {far} and SNR: {snr}"
            )

            # if there is already an event at the same time
            # check if this one is preferred, and only keep
            # the best event in the deque to process
            nearest_event = utils.find_nearest_msg(
                self.event_msgs[mdatasource], coinctime
            )
            if nearest_event:
                logging.info("Found previous event within " +
                             "1 sec of this event.")
                if val > nearest_event["preferred_param"]:
                    logging.info(
                        "New event is preferred, removing previous."
                    )
                    self.event_msgs[mdatasource].remove(nearest_event)
                else:
                    logging.info(
                        "Previous event is preferred, skipping."
                    )
                    return
            else:
                logging.info(
                    "No previous event within 1 sec of this event.")

            # add optional keys - these may or may not
            # already be present depending on the data
            # source configuration
            for key in ("latency", "p_astro", "uid",
                        "pipeline", "snr_optimized"):
                try:
                    dict.update({key: event[key]})
                except KeyError:
                    dict.update({key: None})
            logging.debug(f"combined far: {coincrow.combined_far}")

            # store event data in the deque
            self.event_msgs[mdatasource].append(dict)

            # process the events in the deque
            self.process_events(mdatasource)

        elif mtopic == "inj_stream":
            # parse inj info
            injection = json.loads(message.value())
            ifos = injection["onIFOs"]

            # load the sim table
            simfile = utils.load_xml(injection["sim"])
            simrow = lsctables.SimInspiralTable.get_table(simfile)[0]

            # get injection coalescence time
            simtime = (simrow.geocent_end_time +
                       simrow.geocent_end_time_ns * 10.0**-9)
            logging.info(
                f"received {mdatasource} injection " +
                f"with coalescence time: {simtime}"
            )

            # store inj data
            self.inj_msgs[mdatasource].append(
                {
                    "time": simtime,
                    "sim": simfile,
                    "ifos": ifos,
                    "preferred_event": None,
                }
            )

            # process the events in the deque and then
            # check for stale msgs
            self.process_events(mdatasource)
            self.process_stale_msgs(mdatasource)

        else:
            raise ValueError(
                "Found unexpected message from topic {mtopic}."
            )

append_sim_table

append_sim_table(coinc_file, sim_file)

Append injection SimInspiral Table to the event coinc file object.

Source code in gw/lts/inspinjmsg_find.py
230
231
232
233
234
235
236
237
238
239
def append_sim_table(self, coinc_file, sim_file):
    """
    Append injection SimInspiral Table to the event
    coinc file object.
    """
    # init a new sim inspiral table
    this_sim_table = lsctables.SimInspiralTable.get_table(sim_file)
    coinc_file.childNodes[-1].appendChild(this_sim_table)

    return coinc_file

construct_event_ouput

construct_event_ouput(xmldoc, event, injection=None, key=None)

Construct output message payload to be sent to Kafka.

Parameters:
  • xmldoc

    event coinc file object

  • event

    event json packet

  • injection

    injection json packet

  • key

    optional tag used in writing files to disk, default is None.

Source code in gw/lts/inspinjmsg_find.py
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def construct_event_ouput(self, xmldoc, event, injection=None, key=None):
    """
    Construct output message payload to be sent
    to Kafka.

    Parameters
    ----------
    xmldoc (ligolw document)
        event coinc file object
    event (dict)
        event json packet
    injection (dict)
        injection json packet
    key (str)
        optional tag used in writing files
        to disk, default is None.
    """
    filename = (
        f'coinc-{int(event["time"])}.xml'
        if not key
        else f'{key}-coinc-{int(event["time"])}.xml'
    )

    coinc = event["coinc"]
    coincrow = lsctables.CoincInspiralTable.get_table(coinc)[0]

    if injection:
        simrow = lsctables.SimInspiralTable.get_table(coinc)[0]
        onifos = injection["ifos"]
        time = simrow.geocent_end_time
        time_ns = simrow.geocent_end_time_ns
    else:
        simrow = None
        time = event["time"]
        time_ns = 0
        onifos = ""

    ligolw_utils.write_filename(
        xmldoc, os.path.join("coincs", filename), verbose=self.verbose
    )
    coinc_msg = io.BytesIO()
    ligolw_utils.write_fileobj(xmldoc, coinc_msg)

    output = {
        "time": time,
        "time_ns": time_ns,
        "snr": coincrow.snr,
        "far": coincrow.combined_far,
        "p_astro": event["p_astro"],
        "coinc": coinc_msg.getvalue().decode(),
        "latency": event["latency"],
        "uid": event["uid"],
        "onIFOs": onifos,
        "pipeline": event["pipeline"],
        "snr_optimized": event["snr_optimized"],
    }

    return output

get_preferred_param

get_preferred_param(coinc)

Parse coinc file object for the preferred parameter (either ifar or snr).

Source code in gw/lts/inspinjmsg_find.py
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
def get_preferred_param(self, coinc):
    """
    Parse coinc file object for the preferred
    parameter (either ifar or snr).
    """
    # IFAR
    ifar = 1.0 / coinc.getColumnByName("combined_far")[0]
    # SNR
    snr = coinc.getColumnByName("snr")[0]
    # get preferred param value for this event
    if self.preferred_param == "ifar":
        return (ifar, snr)
    elif self.preferred_param == "snr":
        return (snr, ifar)
    else:
        raise NotImplementedError

process_events

process_events(datasource)

For each event in the event_msgs deque, find the nearest injection in inj_msgs within +/- delta_t (1 second) of the event coalescence time. When an association is made, check to see if its better than any previous event found. If so, add the sim inspiral table from injection to the event's coinc xml and send a message to the testsuite.events topic and remove the processed event from the deque.

Source code in gw/lts/inspinjmsg_find.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
def process_events(self, datasource):
    """
    For each event in the event_msgs deque, find the nearest injection
    in inj_msgs within +/- delta_t (1 second) of the event coalescence
    time. When an association is made, check to see if its better than
    any previous event found. If so, add the sim inspiral table from
    injection to the event's coinc xml and send a message to the
    testsuite.events topic and remove the processed event from the
    deque.
    """
    events_copy = copy.copy(self.event_msgs[datasource])
    injections = self.inj_msgs[datasource]

    for event in events_copy:
        event_time = event["time"]
        nearest_inj = utils.find_nearest_msg(injections, event_time)

        # if no associated injection was found, continue
        if not nearest_inj:
            logging.info(f"No injection found for event at {event_time}")
            continue

        inj_idx = self.inj_msgs[datasource].index(nearest_inj)
        inj_time = nearest_inj["time"]
        sim_file = nearest_inj["sim"]
        prev_preferred_event = nearest_inj["preferred_event"]
        coinc_file = event["coinc"]
        this_coinc = lsctables.CoincInspiralTable.get_table(coinc_file)
        val = self.get_preferred_param(this_coinc)

        # if this is the first event found or
        # this event is better than the previous,
        # send update event.
        # Note: this requires that aggregate by
        # "latest" works the way we would hope
        if not prev_preferred_event or val > prev_preferred_event:
            # update preferred event for this injection
            injections[inj_idx].update({"preferred_event": val})

            # proceed with sending event
            # add sim table to coinc file and write to disk
            logging.info(
                f"Sending event with {self.preferred_param} = {val} " +
                f"for injection at time {inj_time}"
            )
            newxmldoc = self.append_sim_table(coinc_file, sim_file)
            output = self.construct_event_ouput(
                                                newxmldoc,
                                                event, nearest_inj
            )

            topic = f"{datasource}.{self.tag}.testsuite.events"
            self.client.write(topic, output)
            logging.info(f'Sent msg to: {topic}')
        else:
            logging.debug(
                f"This event {val} not preferred over previous: " +
                f"{prev_preferred_event}, skipping."
            )

process_stale_msgs

process_stale_msgs(datasource)

process old events or injections that have passed the timeout and send a message with the necessary info this is necessary in the case that: * we receive an event from the search which is not associated with an injection, ie a glitch or real gw candidate. * there is an injection for which we never receive an associated event from the search. ie the injection was not recovered at even the GDB far threshold.

Source code in gw/lts/inspinjmsg_find.py
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
def process_stale_msgs(self, datasource):
    """
    process old events or injections that have passed the timeout
    and send a message with the necessary info
    this is necessary in the case that:
        * we receive an event from the search which is not
        associated with an injection, ie a glitch or real gw
        candidate.
        * there is an injection for which we never receive
        an associated event from the search. ie the injection
        was not recovered at even the GDB far threshold.
    """
    stale_events = self.stale_msgs(self.event_msgs[datasource])
    for stale_event in stale_events:
        stale_time = stale_event["time"]
        nearest_inj = utils.find_nearest_msg(
                self.inj_msgs[datasource], stale_time
        )

        if not nearest_inj:
            logging.info(
                f'Found stale event from time {stale_time} with no ' +
                'nearby injection. Counting as terrestrial and ' +
                'removing from the deque.'
            )

            output = self.construct_event_ouput(
                                                stale_event["coinc"],
                                                stale_event
            )

            topic = f"{datasource}.{self.tag}.testsuite.terr_events"
            self.client.write(topic, output)

            logging.info(f'Sent msg to: {topic}')

            # finally remove event from the deque
            self.event_msgs[datasource].remove(stale_event)

        else:
            # this is not a terrestrial event because it has
            # a nearby injection. But since it is passed the time
            # out, we can remove it and its associated injection
            # at the same time.
            logging.debug(
                f'Removing event and injection from {stale_time} ' +
                'from the deques after reaching timeout.')
            self.event_msgs[datasource].remove(stale_event)
            self.inj_msgs[datasource].remove(nearest_inj)

    stale_injections = self.stale_msgs(self.inj_msgs[datasource])
    for stale_inj in stale_injections:
        stale_time = stale_inj["time"]
        if not stale_inj["preferred_event"]:
            sim_inspiral = stale_inj["sim"]
            logging.info(
                f'Sending {datasource} missed injection msg ' +
                f'for injection {stale_time}'
            )
            simrow = lsctables.SimInspiralTable.get_table(sim_inspiral)[0]
            newxmldoc = ligolw.Document()
            sim_msg = self.write_sim_file(sim_inspiral, newxmldoc)

            output = {
                "time": simrow.geocent_end_time,
                "time_ns": simrow.geocent_end_time_ns,
                "sim": sim_msg.getvalue().decode(),
                "onIFOs": stale_inj["ifos"],
            }

            topic = f"{datasource}.{self.tag}.testsuite.missed_inj"
            self.client.write(topic, output)
            logging.info(f"Sent msg to: {topic}")
            newxmldoc.unlink()

            # finally remove this injection from the deque
            self.inj_msgs[datasource].remove(stale_inj)

        else:
            # this is not a missed injection because a previous
            # event has been found associated with it. But since
            # it is passed the time out, we can remove it and its
            # associated event at the same time.
            nearest_event = utils.find_nearest_msg(
                    self.event_msgs[datasource], stale_time
            )
            logging.debug(
                f'Removing event and injection from {stale_time} ' +
                'from the deques after reaching timeout.')
            self.event_msgs[datasource].remove(nearest_event)
            self.inj_msgs[datasource].remove(stale_inj)

stale_msgs

stale_msgs(deque)

Determine if there are stale messages in the queue to be removed.

Source code in gw/lts/inspinjmsg_find.py
470
471
472
473
474
475
476
477
478
479
480
def stale_msgs(self, deque):
    """
    Determine if there are stale messages
    in the queue to be removed.
    """
    stale = []
    now = float(GPSTimeNow())
    for item in deque:
        if now - item["time"] >= self.timeout:
            stale.append(item)
    return stale

start

start()

Start job service.

Source code in gw/lts/inspinjmsg_find.py
222
223
224
225
226
227
228
def start(self):
    """
    Start job service.
    """
    # start up
    logging.info("Starting up...")
    self.app.start()

write_sim_file

write_sim_file(sim, xmldoc)

Write a ligolw file object including the injection SimInspiral Table

Source code in gw/lts/inspinjmsg_find.py
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
def write_sim_file(self, sim, xmldoc):
    """
    Write a ligolw file object including the
    injection SimInspiral Table
    """
    # open a new xml doc
    sim_msg = io.BytesIO()
    ligolw_elem = xmldoc.appendChild(ligolw.LIGO_LW())

    output_simtable = ligolw_elem.appendChild(
        lsctables.New(lsctables.SimInspiralTable)
    )
    this_sim_table = lsctables.SimInspiralTable.get_table(sim)
    output_simtable.extend(this_sim_table)
    ligolw_utils.write_fileobj(xmldoc, sim_msg)

    return sim_msg