Skip to content

mux

BlockMuxStream

BlockMuxStream(reader, start=None, timeout=0)

Bases: Generic[T]

A time-aware, gap-handling multiplexer for SeriesBlock streams.

Given SeriesBlocks from multiple named streams with monotonically increasing integer timestamps, this data structure can be used to pull out sets of synchronized blocks, all with the same timestamps. If data on the streams is not available before timeouts are reached, gap blocks will be returned.

The oldest items will be held until either all named streams are available or until the timeout has been reached. If a start time has been set, any items with an older timestamp will be rejected.

Parameters:

Name Type Description Default
reader StreamReader

StreamReader object producing multiple stream to multiplex.

required
start_ns int

GPS start time of stream, in nanoseconds.

required
timeout int = 0

Overall timeout for the muxer, in nanoseconds. Overrides individual queue timeouts. If not specified the mux timeout will be the max of the individual queue timeouts.

0
Source code in arrakis/mux.py
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
def __init__(
    self,
    reader: StreamReader,
    start: int | None = None,
    timeout: int = 0,
):
    self.reader = reader
    self.start = start
    # extract stream info
    self._queues: dict[str, TimedQueue] = {}
    self.channels: list[Channel] = []
    for stream_name, channels in self.reader.streams.items():
        # gather stride and latency
        # done this way to satisfy type checking consistency
        strides = []
        timeouts = []
        for channel in channels:
            assert channel.stride
            strides.append(channel.stride)
            assert channel.max_latency
            timeouts.append(channel.max_latency)
        # the stride for a stream is the least common multiple of
        # the stride of the individual channels
        qstride = math.lcm(*strides)
        # timeout is max of all expected latencies
        qtimeout = max(timeouts)
        self._queues[stream_name] = TimedQueue(
            stride=qstride,
            # mux timeout overrides individual timesouts
            timeout=timeout or qtimeout,
            start=start,
        )
        self.channels.extend(channels)
    # the overall stride for multiple streams is the least common
    # multiple of the stride of the individual streams
    self.stride = math.lcm(*(q.stride for q in self._queues.values()))
    self.timeout = max(q.timeout for q in self._queues.values())

__getitem__

__getitem__(key)

Access an individual queue.

Source code in arrakis/mux.py
473
474
475
def __getitem__(self, key: str) -> TimedQueue:
    """Access an individual queue."""
    return self._queues[key]

pull

pull()

Pull synchronized, concatenated, combined blocks from all streams covering the overall specified stride.

Parameters:

Name Type Description Default
channels Iterable[Channel]

Channel list for creating gap blocks

required

Returns:

Type Description
SeriesBlock
Source code in arrakis/mux.py
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
def pull(self) -> SeriesBlock:
    """Pull synchronized, concatenated, combined blocks from all
    streams covering the overall specified stride.

    Parameters
    ----------
    channels : Iterable[Channel]
        Channel list for creating gap blocks

    Returns
    -------
    SeriesBlock

    """
    blocks = []
    for queue in self._queues.values():
        q_blocks = []
        for time_ns, block in queue.pull(self.stride):
            if block is None:
                block = SeriesBlock.full_gap(
                    time_ns,
                    self.stride,
                    self.channels,
                )
            q_blocks.append(block)
        q_block = concatenate_blocks(*q_blocks)
        blocks.append(q_block)
    return combine_blocks(*blocks)

push

push(stream_name, block)

Push an element for time into a particular queue.

Source code in arrakis/mux.py
477
478
479
480
481
def push(self, stream_name: str, block: SeriesBlock):
    """Push an element for time into a particular queue."""
    time_ns = block.time_ns
    q = self._queues[stream_name]
    q.push(time_ns, block)

ready

ready()

True if all queues have the expected number of elements

covering the overall muxer stride.

Source code in arrakis/mux.py
483
484
485
486
487
488
489
490
491
def ready(self) -> bool:
    """True if all queues have the expected number of elements

    covering the overall muxer stride.

    """
    # Align queues first, then check if all have enough elements
    self._align_queues()
    return all(q.ready(self.stride) for q in self._queues.values())

MuxedData dataclass

MuxedData(time, data)

Bases: Mapping, Generic[T]

Container that holds timestamped data.

Parameters:

Name Type Description Default
time int

The timestamp associated with this data, in nanoseconds.

required
data dict[str, T]

The keyed data.

required

Muxer

Muxer(keys, start=None, timeout=DEFAULT_TIMEOUT)

Bases: Generic[T]

A data structure that multiplexes items from multiple named streams.

Given items from multiple named streams with monotonically increasing integer timestamps, this data structure can be used to pull out sets of synchronized items (items all with the same timestamp).

The oldest items will be held until either all named streams are available or until the timeout has been reached. If a start time has been set, any items with an older timestamp will be rejected.

Parameters:

Name Type Description Default
keys Iterable[str]

Identifiers for the named streams to expect when adding items.

required
start int

The GPS time to start muxing items for. If not set, accept items from any time.

None
timeout timedelta or None

The maximum time to wait for messages from named streams, in seconds, before multiplexing. If None is specified, wait indefinitely. Default is 1 second.

DEFAULT_TIMEOUT
Source code in arrakis/mux.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
def __init__(
    self,
    keys: Iterable[str],
    start: int | None = None,
    timeout: timedelta | None = DEFAULT_TIMEOUT,
) -> None:
    self._keys = set(keys)
    self._items: dict[int, dict[str, T]] = defaultdict(lambda: defaultdict())
    self._times: list[int] = []
    self._last_time = (
        (start - 1) if start is not None else numpy.iinfo(numpy.int64).min
    )
    self._start = start
    self._timeout = timeout

    # track when processing started to handle lookback properly
    self._processing_start_time = int(gpsnow() * Time.SECONDS)

pull

pull()

Pull monotonically increasing synchronized items from the muxer.

Yields:

Type Description
MuxedData[T]

Synchronized items with a common timestamp, keyed by stream keys.

Source code in arrakis/mux.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
def pull(self) -> Iterator[MuxedData[T]]:
    """Pull monotonically increasing synchronized items from the muxer.

    Yields
    ------
    MuxedData[T]
        Synchronized items with a common timestamp, keyed by stream keys.

    """
    if not self._times:
        return

    # yield items in monotonically increasing order as long
    # as conditions are met
    time = self._times[0]
    while (
        self._has_all_items(time)
        or self._are_items_stale(time)
        or self._has_complete_newer_timestamp(time)
    ):
        yield MuxedData(time, self._items.pop(time))
        self._last_time = heapq.heappop(self._times)
        if not self._times:
            break
        time = self._times[0]

push

push(time, key, item, on_drop=ONDROP_DEFAULT)

Push an item into the muxer.

Parameters:

Name Type Description Default
time int

The timestamp associated with this item.

required
key str

The key stream associated with this item. Must match a key provided at initialization.

required
item T

The item to add.

required
on_drop str

Specifies behavior when the item would be dropped from the muxer, in the case that it was not provided to the muxer before the specified timeout. Options are 'ignore', 'raise', or 'warn'. Default is 'warn'.

ONDROP_DEFAULT
Source code in arrakis/mux.py
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
def push(self, time: int, key: str, item: T, on_drop: str = ONDROP_DEFAULT) -> None:
    """Push an item into the muxer.

    Parameters
    ----------
    time : int
        The timestamp associated with this item.
    key : str
        The key stream associated with this item. Must match a key provided
        at initialization.
    item : T
        The item to add.
    on_drop : str, optional
        Specifies behavior when the item would be dropped from the muxer,
        in the case that it was not provided to the muxer before the
        specified timeout. Options are 'ignore', 'raise', or 'warn'.
        Default is 'warn'.

    """
    if key not in self._keys:
        msg = f"{key} doesn't match keys provided at initialization"
        raise KeyError(msg)

    # skip over items that have already been pulled
    if time <= self._last_time:
        if self._start is not None and time < self._start:
            return
        msg = f"item's timestamp is too old: ({time} <= {self._last_time})"
        match OnDrop[on_drop.upper()]:
            case OnDrop.IGNORE:
                return
            case OnDrop.RAISE:
                raise ValueError(msg)
            case OnDrop.WARN:
                logger.warning(msg)
                warnings.warn(msg, stacklevel=2)
                return

    # add item
    if time in self._items:
        if key not in self._items[time]:
            self._items[time][key] = item
    else:
        heapq.heappush(self._times, time)
        self._items[time][key] = item

TimedQueue

TimedQueue(stride, timeout, start=None)

Bases: Generic[T]

A sequential, time-stamped queue handling gaps and timeouts

Parameters:

Name Type Description Default
start int

GPS start time of queue, in nanoseconds.

None
stride int

Time step for elements in the queue, in nanoseconds.

required
timeout int

Timeout for elements in the queue, in nanoseconds.

required
Source code in arrakis/mux.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
def __init__(self, stride: int, timeout: int, start: int | None = None):
    self.stride = stride
    self.timeout = timeout
    self.start = start
    # Track if queue has received data yet (only relevant when start is None)
    self._initialized = start is not None
    # set the initial last_time attribute to be the first multiple
    # of stride before the requested start time
    if start is None:
        start = time_as_ns(gpsnow())
    self.last_time = ((start // self.stride) - 1) * self.stride
    self._queue: deque = deque()
    # FIXME: what's the right value here?
    # self._max_queue = int(2 * self.timeout / self.stride)
    self._max_queue = 1_000_000
    self._lock = RLock()

drain_until

drain_until(target_time)

Remove elements until the front element is at or after target_time.

Source code in arrakis/mux.py
402
403
404
405
406
def drain_until(self, target_time: int) -> None:
    """Remove elements until the front element is at or after target_time."""
    with self._lock:
        while len(self._queue) > 0 and self._queue[0][0] < target_time:
            self._queue.popleft()

front_time

front_time()

Return the timestamp of the front element, or None if empty.

Source code in arrakis/mux.py
395
396
397
398
399
400
def front_time(self) -> int | None:
    """Return the timestamp of the front element, or None if empty."""
    with self._lock:
        if len(self._queue) > 0:
            return self._queue[0][0]
        return None

pull

pull(duration=None)

Drain the queue.

Gaps are represented by None elements.

Parameters:

Name Type Description Default
duration int

Duration to extract, in nanoseconds. If the specified duration is not available, no elements will be returned. If not specified, the queue will be drained.

None

Yields:

Type Description
tuple[time, element]

The element from the queue and it's associated timestamp.

Source code in arrakis/mux.py
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
def pull(self, duration: int | None = None) -> Iterator[tuple[int, Any]]:
    """Drain the queue.

    Gaps are represented by None elements.

    Parameters
    ----------
    duration : int
        Duration to extract, in nanoseconds.  If the specified
        duration is not available, no elements will be returned.
        If not specified, the queue will be drained.

    Yields
    ------
    tuple[time, element]
        The element from the queue and it's associated timestamp.

    """
    self._update_timeout()

    with self._lock:
        # if duration specified, pull the requested number of elements
        if duration:
            if n_elements := self.ready(duration):
                for _ in range(n_elements):
                    yield self._queue.popleft()

        # else drain the queue
        else:
            yield from self._queue
            self._queue.clear()

push

push(time, element, on_drop=ONDROP_DEFAULT)

Push an element into the queue.

The time being pushed into the queue must be a multiple of the time stride specified at initialization of the queue.

If the time associated with the pushed element is less than what has already been processed by the queue, the batch will be dropped and this operation will be a no-op.

Parameters:

Name Type Description Default
time int

GPS time associated with the element, in nanoseconds

required
element Any

element being pushed into the queue.

required
on_drop str

Specifies behavior when the item would be dropped from the muxer, in the case that it was not provided to the muxer before the specified timeout. Options are 'ignore', 'raise', or 'warn'. Default is 'warn'.

ONDROP_DEFAULT
Source code in arrakis/mux.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
def push(self, time: int, element: Any, on_drop: str = ONDROP_DEFAULT) -> None:
    """Push an element into the queue.

    The time being pushed into the queue must be a multiple of the
    time stride specified at initialization of the queue.

    If the time associated with the pushed element is less than
    what has already been processed by the queue, the batch will
    be dropped and this operation will be a no-op.

    Parameters
    ----------
    time : int
        GPS time associated with the element, in nanoseconds
    element : Any
        element being pushed into the queue.
    on_drop : str, optional
        Specifies behavior when the item would be dropped from the muxer,
        in the case that it was not provided to the muxer before the
        specified timeout. Options are 'ignore', 'raise', or 'warn'.
        Default is 'warn'.

    """
    assert time % self.stride == 0, (
        f"time {time} is not a multiple of queue stride {self.stride:_}"
    )
    # If this is the first real block (not a gap), reset last_time to align
    # with it if start time was not set initially. This mitigates the
    # amount of data to be dropped while still allowing gaps to be returned
    # in a timely manner if no data has ever been received.
    if not self._initialized and element is not None:
        self.last_time = ((time // self.stride) - 1) * self.stride
        self._initialized = True

    # if time is older that what's already in the queue, drop it
    if time <= self.last_time:
        msg = f"item's timestamp is too old: ({time:_} <= {self.last_time:_})"
        match OnDrop[on_drop.upper()]:
            case OnDrop.IGNORE:
                return
            case OnDrop.RAISE:
                raise ValueError(msg)
            case OnDrop.WARN:
                logger.warning(msg)
                warnings.warn(msg, stacklevel=2)
                return
    # use a lock so that the last_time attribute is always
    # synchronous with element append
    with self._lock:
        # backfill any skipped times with None elements (gaps)
        # FIXME: could be deque.extend?
        for t in range(self.last_time, time, self.stride)[1:]:
            self._append(t, None)
        # insert the new element
        self._append(time, element)
        self.last_time = time

ready

ready(duration)

Check if queue holds duration worth of elements

Returns:

Type Description
int or None

Returns either the number of elements that span duration, or None if the queue does not have duration worth of elements,

Source code in arrakis/mux.py
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
def ready(self, duration: int) -> int | None:
    """Check if queue holds duration worth of elements

    Returns
    -------
    int or None
        Returns either the number of elements that span duration,
        or None if the queue does not have duration worth of
        elements,

    """
    self._update_timeout()
    assert duration % self.stride == 0, (
        f"duration {duration:_} is not a multiple of queue stride {self.stride:_}"
    )
    n_elements = int(duration / self.stride)
    if len(self) >= n_elements:
        return n_elements
    return None