Processes alerts received from igwn-alert.
Parse the alert payload and process alerts
associated with new or updated events only
from the channel names, submitter, and search
specified.
| Parameters: |
-
topic
–
topic from which alert was received
-
payload
–
|
Source code in gw/lts/igwn_alert_listener.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237 | def process_alert(self, topic=None, payload=None):
"""
Processes alerts received from igwn-alert.
Parse the alert payload and process alerts
associated with new or updated events only
from the channel names, submitter, and search
specified.
Parameters
----------
topic (str)
topic from which alert was received
payload (str)
alert payload
"""
# unpack alert payload
payload = json.loads(payload)
id = payload["uid"]
alert_type = payload["alert_type"]
data = payload["data"]
# only need to process new or update type alerts
if alert_type not in ("new", "update"):
logging.info(f"Received {alert_type}, skipping")
return
# first get the event uid and event level data
# we have to do this slightly differently for
# superevent alerts vs event alerts
if id.startswith("S"):
uid = data["preferred_event"]
event_data = data["preferred_event_data"]
datasource = "superevents"
logging.info(
f"Received {alert_type} alert for {id} " +
f"from {datasource}, preferred event: {uid}"
)
else:
uid = id
event_data = data
datasource = data["pipeline"]
logging.info(f"Received {alert_type} alert for {uid}" +
f" from {datasource}")
# now that we have the uid for a specific event,
# check the channels this event comes from. skip
# if not from injection channels
channels = self.get_channels(uid)
if not channels or not channels.issubset(self.inj_channels):
logging.debug(f"{uid} not from injection channels, skipping.")
return
# filter events by search
search = event_data["search"]
if not event_data["search"] == self.search:
logging.info(f"Skipping {search} event...")
# filter events by submitter if provided
submitter = event_data["submitter"]
if self.submitter and not submitter == self.submitter:
logging.info(f"skipping event {uid} submitted by {submitter}")
return
# get event object, this has some info not
# included in the alert payload
event = self.gracedb_client.get_event(uid=uid)
# construct event data to be sent in kafka messages
if uid in self.events.keys():
self.events[uid] = (self.process_event(
uid, event, output=self.events[uid])
)
else:
self.events[uid] = self.process_event(uid, event)
# check if all elements present, then send msg
# only send msg once per event
topic = f"{datasource}.{self.tag}.testsuite.inj_events"
for uid, data in self.events.items():
if all(data.values()) and uid not in self.events_sent:
logging.info(
f'sending a message for {uid} (coa time: {data["time"]}).'
)
self.client.write(topic, data)
self.events_sent.append(uid)
# clean out old events that already had a msg sent
time_now = float(GPSTimeNow())
for key, value in list(self.events.items()):
if time_now - value["time_added"] >= self.max_wait_time:
logging.debug(f"Removing old event: {key}")
self.events.pop(key)
|