Skip to content

flight

Arrow Flight utilities.

MultiFlightReader

MultiFlightReader(endpoints, initial_url, metadata=None)

Bases: StreamReader

Multi-threaded Arrow Flight endpoint stream iterator context manager

Given a list of endpoints, connect to all of them in parallel and stream data from them all interleaved.

Source code in arrakis/flight.py
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
def __init__(
    self,
    endpoints: list[flight.FlightEndpoint],
    initial_url: str | flight.FlightClient,
    metadata: dict[str, Channel] | None = None,
):
    """initialize with list of endpoints and an reusable flight client"""
    self.endpoints = endpoints
    self.initial_url = initial_url
    self.metadata = metadata

    self.queue: queue.SimpleQueue = queue.SimpleQueue()
    # FIXME: this quit event could be replaced with the
    # Queue.shutdown method after python 3.13
    self.quit_event: threading.Event = threading.Event()
    self.done_events = [threading.Event() for _ in self.endpoints]
    self.executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=len(self.endpoints),
    )
    self.futures: list[concurrent.futures.Future] = []

close

close()

close all streams

Source code in arrakis/flight.py
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
def close(self):
    """close all streams"""
    self.quit_event.set()
    if self.futures is not None:
        for f in self.futures:
            # re-raise exceptions to the client, returning
            # user-friendly Flight-specific errors when relevant
            try:
                f.result()
            except flight.FlightError as e:
                # NOTE: this strips the original message of everything
                # besides the original error message raised by the server
                msg = e.args[0].partition(" Detail:")[0]
                raise type(e)(msg, e.extra_info) from None

    self.executor.shutdown(cancel_futures=True)
    self.futures = []

done

done()

returns True when all threads are done and the queue is empty

Source code in arrakis/flight.py
332
333
334
def done(self) -> bool:
    """returns True when all threads are done and the queue is empty"""
    return all(e.is_set() for e in self.done_events) and self.queue.empty()

read

read(*, convert_blocks=False)

read all available data from all streams

Yields:

Type Description
tuple of (stream_id, data)
Source code in arrakis/flight.py
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
def read(
    self,
    *,
    convert_blocks: bool = False,
) -> Generator[
    tuple[str, Any],
    None,
    None,
]:
    """read all available data from all streams

    Yields
    ------
    tuple of (stream_id, data)

    """
    if convert_blocks:
        assert self.metadata, (
            "Must be initialized with channel metadata to convert to blocks."
        )
    while True:
        try:
            stream_id, batch = self.queue.get(block=False)
            if convert_blocks:
                assert self.metadata  # needed for type checking consistency
                data = SeriesBlock.from_column_batch(
                    batch,
                    self.metadata,
                )
            else:
                data = batch
            yield stream_id, data
        except queue.Empty:
            break

RequestValidator

RequestValidator()

A validator for JSON-encoded requests.

Source code in arrakis/flight.py
63
64
65
66
67
68
69
70
def __init__(self) -> None:
    self._validators: dict[RequestType, jsonschema.Draft7Validator] = {}

    # load generic descriptor schema
    resource = resources.files(arrakis_schema).joinpath("descriptor.json")
    with resources.as_file(resource) as path:
        schema = json.loads(path.read_text())
    self._generic_validator = jsonschema.Draft7Validator(schema)

validate

validate(payload)

Validate a JSON-encoded request.

Parameters:

Name Type Description Default
payload Request

A dictionary with a 'request' and an 'args' key encoding the given Flight request.

required

Raises:

Type Description
ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def validate(self, payload: Request) -> None:
    """Validate a JSON-encoded request.

    Parameters
    ----------
    payload : Request
        A dictionary with a 'request' and an 'args' key encoding
        the given Flight request.

    Raises
    ------
    ValidationError
        If the request does not match the expected schema.

    """
    self._generic_validator.validate(payload)
    request = RequestType[payload["request"]]

    # load schema on demand
    if request not in self._validators:
        resource = resources.files(arrakis_schema).joinpath(
            f"{request.name.lower()}.json"
        )
        with resources.as_file(resource) as path:
            schema = json.loads(path.read_text())
        self._validators[request] = jsonschema.Draft7Validator(schema)

    self._validators[request].validate(payload)

create_command

create_command(request_type, *, validator, **kwargs)

Create a Flight command containing a JSON-encoded request.

Parameters:

Name Type Description Default
request_type RequestType

The type of request.

required
validator RequestValidator

A validator to validate that the command matches the expected schema.

required
**kwargs dict

Extra arguments corresponding to the specific request.

{}

Returns:

Type Description
bytes

The JSON-encoded request.

Raises:

Type Description
ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
def create_command(
    request_type: RequestType, *, validator: RequestValidator, **kwargs
) -> bytes:
    """Create a Flight command containing a JSON-encoded request.

    Parameters
    ----------
    request_type : RequestType
        The type of request.
    validator : RequestValidator
        A validator to validate that the command matches the expected schema.
    **kwargs : dict, optional
        Extra arguments corresponding to the specific request.

    Returns
    -------
    bytes
        The JSON-encoded request.

    Raises
    ------
    ValidationError
        If the request does not match the expected schema.

    """
    cmd: Request = {
        "request": request_type.name,
        "args": kwargs,
    }
    validator.validate(cmd)
    return json.dumps(cmd).encode("utf-8")

create_descriptor

create_descriptor(request_type, *, validator, **kwargs)

Create a Flight descriptor given a request.

Parameters:

Name Type Description Default
request_type RequestType

The type of request.

required
validator RequestValidator

A validator to validate that the command matches the expected schema.

required
**kwargs dict

Extra arguments corresponding to the specific request.

{}

Returns:

Type Description
FlightDescriptor

A Flight Descriptor containing the request.

Raises:

Type Description
ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
def create_descriptor(
    request_type: RequestType, *, validator: RequestValidator, **kwargs
) -> flight.FlightDescriptor:
    """Create a Flight descriptor given a request.

    Parameters
    ----------
    request_type : RequestType
        The type of request.
    validator : RequestValidator
        A validator to validate that the command matches the expected schema.
    **kwargs : dict, optional
        Extra arguments corresponding to the specific request.

    Returns
    -------
    flight.FlightDescriptor
        A Flight Descriptor containing the request.

    Raises
    ------
    ValidationError
        If the request does not match the expected schema.

    """
    cmd = create_command(request_type, validator=validator, **kwargs)
    return flight.FlightDescriptor.for_command(cmd)

parse_command

parse_command(cmd, *, validator)

Parse a Flight command into a request.

Parameters:

Name Type Description Default
cmd bytes

The JSON-encoded request.

required
validator RequestValidator

A validator to validate that the command matches the expected schema.

required

Returns:

Name Type Description
request_type RequestType

The type of request.

kwargs dict

Arguments corresponding to the specific request.

Raises:

Type Description
JSONDecodeError

If the command does not decode to valid JSON.

ValidationError

If the request does not match the expected schema.

Source code in arrakis/flight.py
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
def parse_command(
    cmd: bytes, *, validator: RequestValidator
) -> tuple[RequestType, dict]:
    """Parse a Flight command into a request.

    Parameters
    ----------
    cmd : bytes
        The JSON-encoded request.
    validator : RequestValidator
        A validator to validate that the command matches the expected schema.

    Returns
    -------
    request_type : RequestType
        The type of request.
    kwargs : dict
        Arguments corresponding to the specific request.

    Raises
    ------
    JSONDecodeError
        If the command does not decode to valid JSON.
    ValidationError
        If the request does not match the expected schema.

    """
    try:
        parsed = json.loads(cmd.decode("utf-8"))
    except json.JSONDecodeError as e:
        msg = "Command does not decode to valid JSON"
        raise json.JSONDecodeError(msg, e.doc, e.pos) from e
    else:
        validator.validate(parsed)
        return RequestType[parsed["request"]], parsed["args"]