For each event in the event_msgs deque, find the nearest injection
in inj_msgs within +/- delta_t (1 second) of the event coalescence
time. When an association is made, check to see if its better than
any previous event found. If so, add the sim inspiral table from
injection to the event's coinc xml and send a message to the
testsuite.events topic and remove the processed event from the
deque.
Source code in gw/lts/inspinjmsg_find.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376 | def process_events(self, datasource):
"""
For each event in the event_msgs deque, find the nearest injection
in inj_msgs within +/- delta_t (1 second) of the event coalescence
time. When an association is made, check to see if its better than
any previous event found. If so, add the sim inspiral table from
injection to the event's coinc xml and send a message to the
testsuite.events topic and remove the processed event from the
deque.
"""
events_copy = copy.copy(self.event_msgs[datasource])
injections = self.inj_msgs[datasource]
for event in events_copy:
event_time = event["time"]
nearest_inj = utils.find_nearest_msg(injections, event_time)
# if no associated injection was found, continue
if not nearest_inj:
logging.info(f"No injection found for event at {event_time}")
continue
inj_idx = self.inj_msgs[datasource].index(nearest_inj)
inj_time = nearest_inj["time"]
sim_file = nearest_inj["sim"]
prev_preferred_event = nearest_inj["preferred_event"]
coinc_file = event["coinc"]
this_coinc = lsctables.CoincInspiralTable.get_table(coinc_file)
val = self.get_preferred_param(this_coinc)
# if this is the first event found or
# this event is better than the previous,
# send update event.
# Note: this requires that aggregate by
# "latest" works the way we would hope
if not prev_preferred_event or val > prev_preferred_event:
# update preferred event for this injection
injections[inj_idx].update({"preferred_event": val})
# proceed with sending event
# add sim table to coinc file and write to disk
logging.info(
f"Sending event with {self.preferred_param} = {val} " +
f"for injection at time {inj_time}"
)
newxmldoc = self.append_sim_table(coinc_file, sim_file)
output = self.construct_event_ouput(
newxmldoc,
event, nearest_inj
)
topic = f"{datasource}.{self.tag}.testsuite.events"
self.client.write(topic, output)
logging.info(f'Sent msg to: {topic}')
else:
logging.debug(
f"This event {val} not preferred over previous: " +
f"{prev_preferred_event}, skipping."
)
|