Skip to content

client

Client-based API access.

Client

Client(url=None)

Retrieve channel information and timeseries data from Arrakis.

Parameters:

Name Type Description Default
url str

The URL to connect to. If the URL is not set, connect to a default server or one set by ARRAKIS_SERVER.

None
Source code in arrakis/client.py
68
69
70
71
def __init__(self, url: str | None = None):
    self.initial_url = parse_url(url)
    logger.debug("initial url: %s", self.initial_url)
    self._validator = RequestValidator()

count

count(pattern=constants.DEFAULT_MATCH, data_type=None, min_rate=constants.MIN_SAMPLE_RATE, max_rate=constants.MAX_SAMPLE_RATE, publisher=None, time=None)

Count channels matching a set of conditions

Parameters:

Name Type Description Default
pattern str

Channel pattern to match channels with, using regular expressions.

DEFAULT_MATCH
data_type dtype - like | list[dtype - like]

If set, find all channels with these data types.

None
min_rate int

The minimum sampling rate for channels.

MIN_SAMPLE_RATE
max_rate int

The maximum sampling rate for channels.

MAX_SAMPLE_RATE
publisher str | list[str]

If set, find all channels associated with these publishers.

None
time float

GPS time in seconds indicating when the metadata query is valid. If None, routes to the live backend (current state).

None

Returns:

Type Description
int

The number of channels matching query.

Source code in arrakis/client.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
def count(
    self,
    pattern: str = constants.DEFAULT_MATCH,
    data_type: DataTypeLike | None = None,
    min_rate: int | None = constants.MIN_SAMPLE_RATE,
    max_rate: int | None = constants.MAX_SAMPLE_RATE,
    publisher: str | list[str] | None = None,
    time: float | None = None,
) -> int:
    """Count channels matching a set of conditions

    Parameters
    ----------
    pattern : str, optional
        Channel pattern to match channels with, using regular expressions.
    data_type : numpy.dtype-like | list[numpy.dtype-like], optional
        If set, find all channels with these data types.
    min_rate : int, optional
        The minimum sampling rate for channels.
    max_rate : int, optional
        The maximum sampling rate for channels.
    publisher : str | list[str], optional
        If set, find all channels associated with these publishers.
    time : float, optional
        GPS time in seconds indicating when the metadata query is valid.
        If None, routes to the live backend (current state).

    Returns
    -------
    int
        The number of channels matching query.

    """
    data_type = _parse_data_types(data_type)
    if min_rate is None:
        min_rate = constants.MIN_SAMPLE_RATE
    if max_rate is None:
        max_rate = constants.MAX_SAMPLE_RATE
    if publisher is None:
        publisher = []
    elif isinstance(publisher, str):
        publisher = [publisher]

    time_ns = time_as_ns(time) if time is not None else None
    descriptor = create_descriptor(
        RequestType.Count,
        pattern=pattern,
        data_type=data_type,
        min_rate=min_rate,
        max_rate=max_rate,
        publisher=publisher,
        time=time_ns,
        validator=self._validator,
    )
    count = 0
    with connect(self.initial_url) as client:
        flight_info = get_flight_info(client, descriptor)
        with MultiFlightReader(flight_info.endpoints, client) as stream:
            for data in stream.unpack():
                count += data["count"]
    return count

describe

describe(channels, time=None)

Get channel metadata for channels requested

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
time float

GPS time in seconds indicating when the metadata query is valid. If None, routes to the live backend (current state).

None

Returns:

Type Description
dict[str, Channel]

Mapping of channel names to channel metadata.

Source code in arrakis/client.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
def describe(
    self, channels: list[str], time: float | None = None
) -> dict[str, Channel]:
    """Get channel metadata for channels requested

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    time : float, optional
        GPS time in seconds indicating when the metadata query is valid.
        If None, routes to the live backend (current state).

    Returns
    -------
    dict[str, Channel]
        Mapping of channel names to channel metadata.

    """
    time_ns = time_as_ns(time) if time is not None else None
    with connect(self.initial_url) as client:
        return self._describe(client, channels, time_ns=time_ns)

domains

domains()

Get the list of domains available on the server.

Returns:

Type Description
list[str]

Sorted list of domain names.

Source code in arrakis/client.py
355
356
357
358
359
360
361
362
363
364
365
366
367
368
def domains(self) -> list[str]:
    """Get the list of domains available on the server.

    Returns
    -------
    list[str]
        Sorted list of domain names.

    """
    result = self._do_action("scope-map")
    domain_set = set()
    for endpoint_info in result["endpoints"].values():
        domain_set.update(endpoint_info["scopes"].keys())
    return sorted(domain_set)

fetch

fetch(channels, start, end)

Fetch timeseries data

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
start float

GPS start time, in seconds.

required
end float

GPS end time, in seconds.

required

Returns:

Type Description
SeriesBlock

Dictionary-like object containing all requested channel data.

Source code in arrakis/client.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
def fetch(
    self,
    channels: list[str],
    start: float,
    end: float,
) -> SeriesBlock:
    """Fetch timeseries data

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    start : float
        GPS start time, in seconds.
    end : float
        GPS end time, in seconds.

    Returns
    -------
    SeriesBlock
        Dictionary-like object containing all requested channel data.

    """
    return concatenate_blocks(*self.stream(channels, start, end))

find

find(pattern=constants.DEFAULT_MATCH, data_type=None, min_rate=constants.MIN_SAMPLE_RATE, max_rate=constants.MAX_SAMPLE_RATE, publisher=None, time=None)

Find channels matching a set of conditions

Parameters:

Name Type Description Default
pattern str

Channel pattern to match channels with, using regular expressions.

DEFAULT_MATCH
data_type dtype - like | list[dtype - like]

If set, find all channels with these data types.

None
min_rate int

Minimum sampling rate for channels.

MIN_SAMPLE_RATE
max_rate int

Maximum sampling rate for channels.

MAX_SAMPLE_RATE
publisher str | list[str]

If set, find all channels associated with these publishers.

None
time float

GPS time in seconds indicating when the metadata query is valid. If None, routes to the live backend (current state).

None

Yields:

Type Description
Channel

Channel objects for all channels matching query.

Source code in arrakis/client.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def find(
    self,
    pattern: str = constants.DEFAULT_MATCH,
    data_type: DataTypeLike | None = None,
    min_rate: int | None = constants.MIN_SAMPLE_RATE,
    max_rate: int | None = constants.MAX_SAMPLE_RATE,
    publisher: str | list[str] | None = None,
    time: float | None = None,
) -> Generator[Channel, None, None]:
    """Find channels matching a set of conditions

    Parameters
    ----------
    pattern : str, optional
        Channel pattern to match channels with, using regular expressions.
    data_type : numpy.dtype-like | list[numpy.dtype-like], optional
        If set, find all channels with these data types.
    min_rate : int, optional
        Minimum sampling rate for channels.
    max_rate : int, optional
        Maximum sampling rate for channels.
    publisher : str | list[str], optional
        If set, find all channels associated with these publishers.
    time : float, optional
        GPS time in seconds indicating when the metadata query is valid.
        If None, routes to the live backend (current state).

    Yields
    -------
    Channel
        Channel objects for all channels matching query.

    """
    data_type = _parse_data_types(data_type)
    if min_rate is None:
        min_rate = constants.MIN_SAMPLE_RATE
    if max_rate is None:
        max_rate = constants.MAX_SAMPLE_RATE
    if publisher is None:
        publisher = []
    elif isinstance(publisher, str):
        publisher = [publisher]

    time_ns = time_as_ns(time) if time is not None else None
    descriptor = create_descriptor(
        RequestType.Find,
        pattern=pattern,
        data_type=data_type,
        min_rate=min_rate,
        max_rate=max_rate,
        publisher=publisher,
        time=time_ns,
        validator=self._validator,
    )
    with connect(self.initial_url) as client:
        yield from self._stream_channel_metadata(client, descriptor)

server_info

server_info()

Get server version and capability metadata.

Returns:

Type Description
dict

Server metadata including version info, backend type, capabilities, and domains.

Source code in arrakis/client.py
343
344
345
346
347
348
349
350
351
352
353
def server_info(self) -> dict:
    """Get server version and capability metadata.

    Returns
    -------
    dict
        Server metadata including version info, backend type,
        capabilities, and domains.

    """
    return self._do_action("server-info")

stream

stream(channels, start=None, end=None, kafka_url=None)

Stream live or offline timeseries data

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
start float

GPS start time, in seconds.

None
end float

GPS end time, in seconds.

None

Yields:

Type Description
SeriesBlock

Dictionary-like object containing all requested channel data.

Setting neither start nor end begins a live stream starting
from now.
Source code in arrakis/client.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
def stream(
    self,
    channels: list[str],
    start: float | None = None,
    end: float | None = None,
    kafka_url: str | None = None,
) -> Generator[SeriesBlock, None, None]:
    """Stream live or offline timeseries data

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    start : float, optional
        GPS start time, in seconds.
    end : float, optional
        GPS end time, in seconds.

    Yields
    ------
    SeriesBlock
        Dictionary-like object containing all requested channel data.

    Setting neither start nor end begins a live stream starting
    from now.

    """
    start_ns = time_as_ns(start) if start is not None else None
    end_ns = time_as_ns(end) if end is not None else None
    metadata: dict[str, Channel] = {}
    reader: StreamReader

    with connect(self.initial_url) as client:
        # initial describe request to get the full metadata needed
        # to construct the appropriate muxer — use start time so the
        # info server routes to the correct backend for the request
        metadata = self._describe(client, channels, time_ns=start_ns)
        for channel in metadata.values():
            assert channel.stride, "Channels do not include stride info."

        # get the flight info for streams
        descriptor = create_descriptor(
            RequestType.Stream,
            channels=channels,
            start=start_ns,
            end=end_ns,
            validator=self._validator,
        )
        flight_info = get_flight_info(client, descriptor)

    # confirm that we don't have kafka endpoints mixed with
    # other endpoints
    if len(flight_info.endpoints) > 1:
        for endpoint in flight_info.endpoints:
            assert not _endpoint_is_kafka(endpoint), (
                "Only a single kafka endpoint currently supported."
            )

    # construct reader
    if kafka_url is not None or _endpoint_is_kafka(flight_info.endpoints[0]):
        if kafka_url is None:
            kafka_url = flight_info.endpoint.locations[0]
        reader = KafkaReader(
            kafka_url,
            metadata,
            start_ns,
        )
    else:
        reader = ChainedFlightReader(
            flight_info.endpoints,
            self.initial_url,
            metadata=metadata,
        )

    mux: BlockMuxStream = BlockMuxStream(reader, start=start_ns)

    for block in mux.stream(end_ns):
        yield block