Skip to content

client

Client-based API access.

Client

Client(url=None)

Client to fetch or publish timeseries.

Parameters:

Name Type Description Default
url str

The URL to connect to. If the URL is not set, connect to a default server or one set by ARRAKIS_SERVER.

None
Source code in arrakis/client.py
51
52
53
def __init__(self, url: str | None = None):
    self.initial_url = parse_url(url)
    logger.debug("initial url: %s", self.initial_url)

count

count(pattern=constants.DEFAULT_MATCH, data_type=None, min_rate=constants.MIN_SAMPLE_RATE, max_rate=constants.MAX_SAMPLE_RATE, publisher=None)

Count channels matching a set of conditions

Parameters:

Name Type Description Default
pattern str

Channel pattern to match channels with, using regular expressions.

DEFAULT_MATCH
data_type dtype - like | list[dtype - like]

If set, find all channels with these data types.

None
min_rate int

The minimum sampling rate for channels.

MIN_SAMPLE_RATE
max_rate int

The maximum sampling rate for channels.

MAX_SAMPLE_RATE
publisher str | list[str]

If set, find all channels associated with these publishers.

None

Returns:

Type Description
int

The number of channels matching query.

Source code in arrakis/client.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
def count(
    self,
    pattern: str = constants.DEFAULT_MATCH,
    data_type: DataTypeLike | None = None,
    min_rate: int | None = constants.MIN_SAMPLE_RATE,
    max_rate: int | None = constants.MAX_SAMPLE_RATE,
    publisher: str | list[str] | None = None,
) -> int:
    """Count channels matching a set of conditions

    Parameters
    ----------
    pattern : str, optional
        Channel pattern to match channels with, using regular expressions.
    data_type : numpy.dtype-like | list[numpy.dtype-like], optional
        If set, find all channels with these data types.
    min_rate : int, optional
        The minimum sampling rate for channels.
    max_rate : int, optional
        The maximum sampling rate for channels.
    publisher : str | list[str], optional
        If set, find all channels associated with these publishers.

    Returns
    -------
    int
        The number of channels matching query.

    """
    data_type = _parse_data_types(data_type)
    if min_rate is None:
        min_rate = constants.MIN_SAMPLE_RATE
    if max_rate is None:
        max_rate = constants.MAX_SAMPLE_RATE
    if publisher is None:
        publisher = []
    elif isinstance(publisher, str):
        publisher = [publisher]

    descriptor = create_descriptor(
        RequestType.Count,
        pattern=pattern,
        data_type=data_type,
        min_rate=min_rate,
        max_rate=max_rate,
        publisher=publisher,
    )
    count = 0
    with connect(self.initial_url) as client:
        flight_info = get_flight_info(client, descriptor)
        with MultiEndpointStream(flight_info.endpoints, client) as stream:
            for data in stream.unpack():
                count += data["count"]
    return count

describe

describe(channels)

Get channel metadata for channels requested

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required

Returns:

Type Description
dict[str, Channel]

Mapping of channel names to channel metadata.

Source code in arrakis/client.py
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def describe(self, channels: list[str]) -> dict[str, Channel]:
    """Get channel metadata for channels requested

    Parameters
    ----------
    channels : list[str]
        List of channels to request.

    Returns
    -------
    dict[str, Channel]
        Mapping of channel names to channel metadata.

    """
    descriptor = create_descriptor(RequestType.Describe, channels=channels)
    with connect(self.initial_url) as client:
        return {
            channel.name: channel
            for channel in self._stream_channel_metadata(client, descriptor)
        }

fetch

fetch(channels, start, end)

Fetch timeseries data

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
start float

GPS start time, in seconds.

required
end float

GPS end time, in seconds.

required

Returns:

Type Description
SeriesBlock

Dictionary-like object containing all requested channel data.

Source code in arrakis/client.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
def fetch(
    self,
    channels: list[str],
    start: float,
    end: float,
) -> SeriesBlock:
    """Fetch timeseries data

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    start : float
        GPS start time, in seconds.
    end : float
        GPS end time, in seconds.

    Returns
    -------
    SeriesBlock
        Dictionary-like object containing all requested channel data.

    """
    return concatenate_blocks(*self.stream(channels, start, end))

find

find(pattern=constants.DEFAULT_MATCH, data_type=None, min_rate=constants.MIN_SAMPLE_RATE, max_rate=constants.MAX_SAMPLE_RATE, publisher=None)

Find channels matching a set of conditions

Parameters:

Name Type Description Default
pattern str

Channel pattern to match channels with, using regular expressions.

DEFAULT_MATCH
data_type dtype - like | list[dtype - like]

If set, find all channels with these data types.

None
min_rate int

Minimum sampling rate for channels.

MIN_SAMPLE_RATE
max_rate int

Maximum sampling rate for channels.

MAX_SAMPLE_RATE
publisher str | list[str]

If set, find all channels associated with these publishers.

None

Yields:

Type Description
Channel

Channel objects for all channels matching query.

Source code in arrakis/client.py
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
def find(
    self,
    pattern: str = constants.DEFAULT_MATCH,
    data_type: DataTypeLike | None = None,
    min_rate: int | None = constants.MIN_SAMPLE_RATE,
    max_rate: int | None = constants.MAX_SAMPLE_RATE,
    publisher: str | list[str] | None = None,
) -> Generator[Channel, None, None]:
    """Find channels matching a set of conditions

    Parameters
    ----------
    pattern : str, optional
        Channel pattern to match channels with, using regular expressions.
    data_type : numpy.dtype-like | list[numpy.dtype-like], optional
        If set, find all channels with these data types.
    min_rate : int, optional
        Minimum sampling rate for channels.
    max_rate : int, optional
        Maximum sampling rate for channels.
    publisher : str | list[str], optional
        If set, find all channels associated with these publishers.

    Yields
    -------
    Channel
        Channel objects for all channels matching query.

    """
    data_type = _parse_data_types(data_type)
    if min_rate is None:
        min_rate = constants.MIN_SAMPLE_RATE
    if max_rate is None:
        max_rate = constants.MAX_SAMPLE_RATE
    if publisher is None:
        publisher = []
    elif isinstance(publisher, str):
        publisher = [publisher]

    descriptor = create_descriptor(
        RequestType.Find,
        pattern=pattern,
        data_type=data_type,
        min_rate=min_rate,
        max_rate=max_rate,
        publisher=publisher,
    )
    with connect(self.initial_url) as client:
        yield from self._stream_channel_metadata(client, descriptor)

stream

stream(channels, start=None, end=None)

Stream live or offline timeseries data

Parameters:

Name Type Description Default
channels list[str]

List of channels to request.

required
start float

GPS start time, in seconds.

None
end float

GPS end time, in seconds.

None

Yields:

Type Description
SeriesBlock

Dictionary-like object containing all requested channel data.

Setting neither start nor end begins a live stream starting
from now.
Source code in arrakis/client.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def stream(
    self,
    channels: list[str],
    start: float | None = None,
    end: float | None = None,
) -> Generator[SeriesBlock, None, None]:
    """Stream live or offline timeseries data

    Parameters
    ----------
    channels : list[str]
        List of channels to request.
    start : float, optional
        GPS start time, in seconds.
    end : float, optional
        GPS end time, in seconds.

    Yields
    ------
    SeriesBlock
        Dictionary-like object containing all requested channel data.

    Setting neither start nor end begins a live stream starting
    from now.

    """
    start_ns = time_as_ns(start) if start is not None else None
    end_ns = time_as_ns(end) if end is not None else None
    metadata: dict[str, Channel] = {}
    schemas: dict[str, pyarrow.Schema] = {}

    with connect(self.initial_url) as client:
        descriptor = create_descriptor(
            RequestType.Stream, channels=channels, start=start_ns, end=end_ns
        )
        flight_info = get_flight_info(client, descriptor)
        # use the serialized endpoints as the mux keys
        keys = [e.serialize() for e in flight_info.endpoints]
        mux: Muxer = Muxer(keys=keys)
        with MultiEndpointStream(flight_info.endpoints, client) as stream:
            for chunk, endpoint in stream:
                time = chunk.data.column("time").to_numpy()[0]
                mux.push(time, endpoint.serialize(), chunk.data)
                # FIXME: how do we handle stream drop-outs that result
                # in timeouts in the muxer that result in null data in
                # the mux pull?
                for mux_data in mux.pull():
                    blocks = []
                    # update channel metadata if needed
                    for key, batch in mux_data.items():
                        if (
                            key not in schemas
                            or schemas[key].metadata != batch.schema.metadata
                        ):
                            channel_fields: list[pyarrow.field] = list(
                                batch.schema
                            )[1:]
                            for field in channel_fields:
                                metadata[field.name] = Channel.from_field(field)
                            schemas[key] = batch.schema

                        blocks.append(
                            SeriesBlock.from_column_batch(batch, metadata)
                        )

                    # generate synchronized blocks
                    yield combine_blocks(*blocks)