Skip to content

mux

BlockMuxStream

BlockMuxStream(reader, start=None, timeout=0)

Bases: Generic[T]

A time-aware, gap-handling multiplexer for SeriesBlock streams.

Given SeriesBlocks from multiple named streams with monotonically increasing integer timestamps, this data structure can be used to pull out sets of synchronized blocks, all with the same timestamps. If data on the streams is not available before timeouts are reached, gap blocks will be returned.

The oldest items will be held until either all named streams are available or until the timeout has been reached. If a start time has been set, any items with an older timestamp will be rejected.

Parameters:

Name Type Description Default
reader StreamReader

StreamReader object producing multiple stream to multiplex.

required
start int

GPS start time of stream, in nanoseconds.

None
timeout int = 0

Overall timeout for the muxer, in nanoseconds. Overrides individual queue timeouts. If not specified the mux timeout will be the max of the individual queue timeouts.

0
Source code in arrakis/mux.py
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
def __init__(
    self,
    reader: StreamReader,
    start: int | None = None,
    timeout: int = 0,
):
    self.reader = reader
    self.start = start
    # extract stream info
    self._queues: dict[str, TimedQueue] = {}
    self._stream_channels: dict[str, list[Channel]] = {}
    self.channels: list[Channel] = []
    has_latency_constraint = True
    for stream_name, channels in self.reader.streams.items():
        # gather stride and latency
        # done this way to satisfy type checking consistency
        strides = []
        timeouts = []
        for channel in channels:
            assert channel.stride
            strides.append(channel.stride)
            if channel.max_latency is not None:
                timeouts.append(channel.max_latency)
        # the stride for a stream is the least common multiple of
        # the stride of the individual channels
        qstride = math.lcm(*strides)
        # timeout is max of all expected latencies, or 0 if none
        # have a latency constraint (e.g. historical data)
        if timeouts:
            qtimeout = max(timeouts)
        else:
            qtimeout = 0
            has_latency_constraint = False
        self._queues[stream_name] = TimedQueue(
            stride=qstride,
            # mux timeout overrides individual timeouts
            timeout=timeout or qtimeout,
            start=start,
        )
        self._stream_channels[stream_name] = list(channels)
        self.channels.extend(channels)
    # the overall stride for multiple streams is the least common
    # multiple of the stride of the individual streams
    self.stride = math.lcm(*(q.stride for q in self._queues.values()))
    self.timeout = max(q.timeout for q in self._queues.values())
    # Timeout gap-filling is only needed to synchronize multiple
    # queues.  With a single queue (e.g. a client reading pre-muxed
    # data from a Flight endpoint) timeouts race the incoming data
    # and cause spurious drops.  Also disabled when streams have no
    # latency constraint (e.g. historical data from frames).
    self._use_timeouts = len(self._queues) > 1 and has_latency_constraint

__getitem__

__getitem__(key)

Access an individual queue.

Source code in arrakis/mux.py
356
357
358
def __getitem__(self, key: str) -> TimedQueue:
    """Access an individual queue."""
    return self._queues[key]

pull

pull()

Pull synchronized, concatenated, combined blocks from all streams covering the overall specified stride.

Returns:

Type Description
SeriesBlock
Source code in arrakis/mux.py
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
def pull(self) -> SeriesBlock:
    """Pull synchronized, concatenated, combined blocks from all
    streams covering the overall specified stride.

    Returns
    -------
    SeriesBlock

    """
    blocks = []
    for stream_name, queue in self._queues.items():
        q_blocks = []
        for time_ns, block in queue.pull(self.stride, update_timeout=False):
            if block is None:
                block = SeriesBlock.full_gap(
                    time_ns,
                    self.stride,
                    self._stream_channels[stream_name],
                )
            q_blocks.append(block)
        q_block = concatenate_blocks(*q_blocks)
        blocks.append(q_block)
    return combine_blocks(*blocks)

push

push(stream_name, block)

Push an element for time into a particular queue.

Source code in arrakis/mux.py
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
def push(self, stream_name: str, block: SeriesBlock):
    """Push an element for time into a particular queue."""
    q = self._queues[stream_name]
    if block.duration_ns != q.stride:
        # allow partial-stride edge blocks for non-aligned requests
        if q.start is not None and block.duration_ns < q.stride:
            pass
        else:
            logger.warning(
                "dropping block with mismatched stride "
                "at %d ns for stream %s: got %d ns, "
                "expected %d ns",
                block.time_ns,
                stream_name,
                block.duration_ns,
                q.stride,
            )
            return
    q.push(block.time_ns, block)

ready

ready()

True if all queues have the expected number of elements

covering the overall muxer stride.

Source code in arrakis/mux.py
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
def ready(self) -> bool:
    """True if all queues have the expected number of elements

    covering the overall muxer stride.

    """
    # 1. Update timeouts on ALL queues first so gap-filling is
    #    consistent before any alignment happens.
    #    Skip for single-queue muxes — no synchronization needed,
    #    and timeouts race incoming data causing spurious drops.
    if self._use_timeouts:
        for q in self._queues.values():
            q.update_timeout()
        # 2. Align queues to the same front timestamp.
        self._align_queues()
    # 3. Check readiness WITHOUT re-triggering timeouts — the
    #    alignment established in step 2 must not be disturbed.
    return all(
        q.ready(self.stride, update_timeout=False) for q in self._queues.values()
    )

TimedQueue

TimedQueue(stride, timeout, start=None)

Bases: Generic[T]

A sequential, time-stamped queue handling gaps and timeouts

Parameters:

Name Type Description Default
start int

GPS start time of queue, in nanoseconds.

None
stride int

Time step for elements in the queue, in nanoseconds.

required
timeout int

Timeout for elements in the queue, in nanoseconds.

required
Source code in arrakis/mux.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def __init__(self, stride: int, timeout: int, start: int | None = None):
    self.stride = stride
    self.timeout = timeout
    self.start = start
    # Track if queue has received data yet (only relevant when start is None)
    self._initialized = start is not None
    # set the initial last_time attribute to just before the
    # requested start time, so the first push at start doesn't
    # generate spurious gap slots before start.  When start is
    # stride-aligned this is equivalent to start - stride.
    if start is None:
        start = time_as_ns(gpsnow())
    if start % self.stride == 0:
        self.last_time = start - self.stride
    else:
        # non-aligned start: place last_time at the stride boundary
        # just before start so no gap slots are generated before start
        self.last_time = (start // self.stride) * self.stride
    self._queue: deque = deque()
    # FIXME: what's the right value here?
    # self._max_queue = int(2 * self.timeout / self.stride)
    self._max_queue = 1_000_000
    self._lock = RLock()

drain_until

drain_until(target_time)

Remove elements until the front element is at or after target_time.

Source code in arrakis/mux.py
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
def drain_until(self, target_time: int) -> None:
    """Remove elements until the front element is at or after target_time."""
    with self._lock:
        while len(self._queue) > 0 and self._queue[0][0] < target_time:
            self._queue.popleft()
        if len(self._queue) == 0:
            # Queue is completely empty: reset last_time so that
            # _update_timeout refills starting from exactly
            # target_time.  Without this, a queue whose last_time
            # is ahead of the alignment target would be refilled
            # at last_time + stride instead of target_time,
            # breaking time alignment across queues.
            self.last_time = target_time - self.stride
        else:
            # Queue still has elements at or past target: just
            # ensure last_time doesn't lag behind the drain target
            # so _update_timeout doesn't backfill before it.
            self.last_time = max(self.last_time, target_time - self.stride)

front_time

front_time()

Return the timestamp of the front element, or None if empty.

Source code in arrakis/mux.py
251
252
253
254
255
256
def front_time(self) -> int | None:
    """Return the timestamp of the front element, or None if empty."""
    with self._lock:
        if len(self._queue) > 0:
            return self._queue[0][0]
        return None

pull

pull(duration=None, *, update_timeout=True)

Drain the queue.

Gaps are represented by None elements.

Parameters:

Name Type Description Default
duration int

Duration to extract, in nanoseconds. If the specified duration is not available, no elements will be returned. If not specified, the queue will be drained.

None
update_timeout bool

Whether to trigger timeout gap-filling before pulling. Default is True. Set to False when the caller has already updated timeouts (e.g. BlockMuxStream.pull()).

True

Yields:

Type Description
tuple[time, element]

The element from the queue and it's associated timestamp.

Source code in arrakis/mux.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def pull(
    self, duration: int | None = None, *, update_timeout: bool = True
) -> Iterator[tuple[int, Any]]:
    """Drain the queue.

    Gaps are represented by None elements.

    Parameters
    ----------
    duration : int
        Duration to extract, in nanoseconds.  If the specified
        duration is not available, no elements will be returned.
        If not specified, the queue will be drained.
    update_timeout : bool, optional
        Whether to trigger timeout gap-filling before pulling.
        Default is True.  Set to False when the caller has already
        updated timeouts (e.g. BlockMuxStream.pull()).

    Yields
    ------
    tuple[time, element]
        The element from the queue and it's associated timestamp.

    """
    if update_timeout:
        self.update_timeout()

    with self._lock:
        # if duration specified, pull the requested number of elements
        if duration:
            if n_elements := self.ready(duration, update_timeout=False):
                for _ in range(n_elements):
                    yield self._queue.popleft()

        # else drain the queue
        else:
            yield from self._queue
            self._queue.clear()

push

push(time, element, on_drop=ONDROP_DEFAULT)

Push an element into the queue.

The time being pushed into the queue must be a multiple of the time stride specified at initialization of the queue.

If the time associated with the pushed element is less than what has already been processed by the queue, the batch will be dropped and this operation will be a no-op.

Parameters:

Name Type Description Default
time int

GPS time associated with the element, in nanoseconds

required
element Any

element being pushed into the queue.

required
on_drop str

Specifies behavior when the item would be dropped from the muxer, in the case that it was not provided to the muxer before the specified timeout. Options are 'ignore', 'raise', or 'warn'. Default is 'warn'.

ONDROP_DEFAULT
Source code in arrakis/mux.py
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def push(self, time: int, element: Any, on_drop: str = ONDROP_DEFAULT) -> None:
    """Push an element into the queue.

    The time being pushed into the queue must be a multiple of the
    time stride specified at initialization of the queue.

    If the time associated with the pushed element is less than
    what has already been processed by the queue, the batch will
    be dropped and this operation will be a no-op.

    Parameters
    ----------
    time : int
        GPS time associated with the element, in nanoseconds
    element : Any
        element being pushed into the queue.
    on_drop : str, optional
        Specifies behavior when the item would be dropped from the muxer,
        in the case that it was not provided to the muxer before the
        specified timeout. Options are 'ignore', 'raise', or 'warn'.
        Default is 'warn'.

    """
    assert time % self.stride == 0 or time == self.start, (
        f"time {time} is not a multiple of queue stride {self.stride:_}"
        f" (and does not match start {self.start})"
    )
    # If this is the first real block (not a gap), reset last_time to align
    # with it if start time was not set initially. This mitigates the
    # amount of data to be dropped while still allowing gaps to be returned
    # in a timely manner if no data has ever been received.
    # Clear the queue so that stale timeout gaps (pushed before real
    # data arrived) don't sit at the front and get yielded as gap
    # blocks at timestamps that real data will later occupy.
    if not self._initialized and element is not None:
        self._queue.clear()
        self.last_time = ((time // self.stride) - 1) * self.stride
        self._initialized = True

    # if time is older that what's already in the queue, drop it
    if time <= self.last_time:
        msg = f"item's timestamp is too old: ({time:_} <= {self.last_time:_})"
        match OnDrop[on_drop.upper()]:
            case OnDrop.IGNORE:
                return
            case OnDrop.RAISE:
                raise ValueError(msg)
            case OnDrop.WARN:
                logger.warning(msg)
                warnings.warn(msg, stacklevel=2)
                return
    # use a lock so that the last_time attribute is always
    # synchronous with element append
    with self._lock:
        # backfill any skipped times with None elements (gaps)
        # FIXME: could be deque.extend?
        for t in range(self.last_time, time, self.stride)[1:]:
            self._append(t, None)
        # insert the new element
        self._append(time, element)
        # after a non-aligned block (partial edge block), snap
        # last_time to the stride boundary at or before the block
        # so subsequent gap backfills remain stride-aligned
        if time % self.stride != 0:
            self.last_time = (time // self.stride) * self.stride
        else:
            self.last_time = time

ready

ready(duration, *, update_timeout=True)

Check if queue holds duration worth of elements

Parameters:

Name Type Description Default
duration int

Duration to check for, in nanoseconds.

required
update_timeout bool

Whether to trigger timeout gap-filling before checking. Default is True. Set to False when the caller has already updated timeouts (e.g. BlockMuxStream.ready()).

True

Returns:

Type Description
int or None

Returns either the number of elements that span duration, or None if the queue does not have duration worth of elements,

Source code in arrakis/mux.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
def ready(self, duration: int, *, update_timeout: bool = True) -> int | None:
    """Check if queue holds duration worth of elements

    Parameters
    ----------
    duration : int
        Duration to check for, in nanoseconds.
    update_timeout : bool, optional
        Whether to trigger timeout gap-filling before checking.
        Default is True.  Set to False when the caller has already
        updated timeouts (e.g. BlockMuxStream.ready()).

    Returns
    -------
    int or None
        Returns either the number of elements that span duration,
        or None if the queue does not have duration worth of
        elements,

    """
    if update_timeout:
        self.update_timeout()
    assert duration % self.stride == 0, (
        f"duration {duration:_} is not a multiple of queue stride {self.stride:_}"
    )
    n_elements = int(duration / self.stride)
    if len(self) >= n_elements:
        return n_elements
    return None