Skip to content

mux

MuxedData dataclass

MuxedData(time, data)

Bases: Mapping, Generic[T]

Container that holds timestamped data.

Parameters:

Name Type Description Default
time int

The timestamp associated with this data, in nanoseconds.

required
data dict[str, T]

The keyed data.

required

Muxer

Muxer(keys, start=None, timeout=DEFAULT_TIMEOUT)

Bases: Generic[T]

A data structure that multiplexes items from multiple named streams.

Given items from multiple named streams with monotonically increasing integer timestamps, this data structure can be used to pull out sets of synchronized items (items all with the same timestamp).

The oldest items will be held until either all named streams are available or until the timeout has been reached. If a start time has been set, any items with an older timestamp will be rejected.

Parameters:

Name Type Description Default
keys Iterable[str]

Identifiers for the named streams to expect when adding items.

required
start int

The GPS time to start muxing items for. If not set, accept items from any time.

None
timeout timedelta or None

The maximum time to wait for messages from named streams, in seconds, before multiplexing. If None is specified, wait indefinitely. Default is 1 second.

DEFAULT_TIMEOUT
Source code in arrakis/mux.py
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
def __init__(
    self,
    keys: Iterable[str],
    start: int | None = None,
    timeout: timedelta | None = DEFAULT_TIMEOUT,
) -> None:
    self._keys = set(keys)
    self._items: dict[int, dict[str, T]] = defaultdict(lambda: defaultdict())
    self._times: list[int] = []
    self._last_time = None
    self._start = start
    self._last_time = start
    self._timeout = timeout

    # track when processing started to handle lookback properly
    self._processing_start_time = int(gpstime.gpsnow() * Time.SECONDS)

pull

pull()

Pull monotonically increasing synchronized items from the muxer.

Yields:

Type Description
MuxedData[T]

Synchronized items with a common timestamp, keyed by stream keys.

Source code in arrakis/mux.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
def pull(self) -> Iterator[MuxedData[T]]:
    """Pull monotonically increasing synchronized items from the muxer.

    Yields
    ------
    MuxedData[T]
        Synchronized items with a common timestamp, keyed by stream keys.

    """
    if not self._times:
        return

    # yield items in monotonically increasing order as long
    # as conditions are met
    time = self._times[0]
    while self._has_all_items(time) or self._are_items_stale(time):
        yield MuxedData(time, self._items.pop(time))
        self._last_time = heapq.heappop(self._times)
        if not self._times:
            break
        time = self._times[0]

push

push(time, key, item, on_drop='warn')

Push an item into the muxer.

Parameters:

Name Type Description Default
time int

The timestamp associated with this item.

required
key str

The key stream associated with this item. Must match a key provided at initialization.

required
item T

The item to add.

required
on_drop str

Specifies behavior when the item would be dropped from the muxer, in the case that it was not provided to the muxer before the specified timeout. Options are 'ignore', 'raise', or 'warn'. Default is 'warn'.

'warn'
Source code in arrakis/mux.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def push(self, time: int, key: str, item: T, on_drop: str = "warn") -> None:
    """Push an item into the muxer.

    Parameters
    ----------
    time : int
        The timestamp associated with this item.
    key : str
        The key stream associated with this item. Must match a key provided
        at initialization.
    item : T
        The item to add.
    on_drop : str, optional
        Specifies behavior when the item would be dropped from the muxer,
        in the case that it was not provided to the muxer before the
        specified timeout. Options are 'ignore', 'raise', or 'warn'.
        Default is 'warn'.

    """
    if key not in self._keys:
        msg = f"{key} doesn't match keys provided at initialization"
        raise KeyError(msg)

    if not self._last_time:
        self._last_time = time

    # skip over items that have already been pulled
    if time < self._last_time:
        if self._start is not None and time < self._start:
            return
        msg = f"item's timestamp is too old: ({time} < {self._last_time})"
        match OnDrop[on_drop.lower()]:
            case OnDrop.IGNORE:
                return
            case OnDrop.RAISE:
                raise ValueError(msg)
            case OnDrop.WARN:
                warnings.warn(msg, stacklevel=2)

    # add item
    if time in self._items:
        if key not in self._items[time]:
            self._items[time][key] = item
    else:
        heapq.heappush(self._times, time)
        self._items[time][key] = item