Skip to content

flight

Arrow Flight utilities.

MultiEndpointStream

MultiEndpointStream(endpoints, initial_client)

Bases: AbstractContextManager

Multi-threaded Arrow Flight endpoint stream iterator context manager

Given a list of endpoints, connect to all of them in parallel and stream data from them all interleaved.

Source code in arrakis/flight.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
def __init__(
    self,
    endpoints: list[flight.FlightEndpoint],
    initial_client: flight.FlightClient,
):
    """initialize with list of endpoints and an reusable flight client"""
    self.endpoints = endpoints
    self.initial_client = initial_client
    self.q: queue.SimpleQueue = queue.SimpleQueue()
    self.quit_event: threading.Event = threading.Event()
    self.executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=len(self.endpoints),
    )
    self.threads_done = {endpoint.serialize(): False for endpoint in endpoints}
    self.futures: list[concurrent.futures.Future] | None = None

__iter__

__iter__(timeout=constants.DEFAULT_QUEUE_TIMEOUT)

Execute the streams and yield the results

Yielded results are a tuple of the data chunk, and the endpoint it came from.

The timeout is expected to be a timedelta object.

Source code in arrakis/flight.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
def __iter__(
    self,
    timeout: timedelta = constants.DEFAULT_QUEUE_TIMEOUT,
) -> Generator[
    flight.FlightStreamReader
    | tuple[flight.FlightStreamReader, flight.FlightEndpoint],
    None,
    None,
]:
    """Execute the streams and yield the results

    Yielded results are a tuple of the data chunk, and the
    endpoint it came from.

    The timeout is expected to be a timedelta object.

    """
    self.futures = [
        self.executor.submit(self._execute_endpoint, endpoint)
        for endpoint in self.endpoints
    ]

    while not all(self.threads_done.values()):
        try:
            data, endpoint = self.q.get(block=True, timeout=timeout.total_seconds())
        except queue.Empty:
            pass
        else:
            if data is EOS:
                self.threads_done[endpoint.serialize()] = True
            else:
                yield data, endpoint
        for future in self.futures:
            if future.done() and future.exception():
                self.quit_event.set()

close

close()

close all streams

Source code in arrakis/flight.py
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def close(self):
    """close all streams"""
    self.quit_event.set()
    if self.futures is not None:
        for f in self.futures:
            # re-raise exceptions to the client, returning
            # user-friendly Flight-specific errors when relevant
            try:
                f.result()
            except flight.FlightError as e:
                # NOTE: this strips the original message of everything
                # besides the original error message raised by the server
                msg = e.args[0].partition(" Detail:")[0]
                raise type(e)(msg, e.extra_info) from None

    self.executor.shutdown(cancel_futures=True)
    self.futures = None

unpack

unpack()

Unpack stream data into individual elements

Source code in arrakis/flight.py
211
212
213
214
def unpack(self):
    """Unpack stream data into individual elements"""
    for chunk, _ in self:
        yield from chunk.data.to_pylist()

create_command

create_command(request_type, **kwargs)

Create a Flight command containing a JSON-encoded request.

Parameters:

Name Type Description Default
request_type RequestType

The type of request.

required
**kwargs dict

Extra arguments corresponding to the specific request.

{}

Returns:

Type Description
bytes

The JSON-encoded request.

Source code in arrakis/flight.py
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def create_command(request_type: RequestType, **kwargs) -> bytes:
    """Create a Flight command containing a JSON-encoded request.

    Parameters
    ----------
    request_type : RequestType
        The type of request.
    **kwargs : dict, optional
        Extra arguments corresponding to the specific request.

    Returns
    -------
    bytes
        The JSON-encoded request.

    """
    cmd = {
        "request": request_type.name,
        "args": kwargs,
    }
    return json.dumps(cmd).encode("utf-8")

create_descriptor

create_descriptor(request_type, **kwargs)

Create a Flight descriptor given a request.

Parameters:

Name Type Description Default
request_type RequestType

The type of request.

required
**kwargs dict

Extra arguments corresponding to the specific request.

{}

Returns:

Type Description
FlightDescriptor

A Flight Descriptor containing the request.

Source code in arrakis/flight.py
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
def create_descriptor(request_type: RequestType, **kwargs) -> flight.FlightDescriptor:
    """Create a Flight descriptor given a request.

    Parameters
    ----------
    request_type : RequestType
        The type of request.
    **kwargs : dict, optional
        Extra arguments corresponding to the specific request.

    Returns
    -------
    flight.FlightDescriptor
        A Flight Descriptor containing the request.

    """
    cmd = create_command(request_type, **kwargs)
    return flight.FlightDescriptor.for_command(cmd)

parse_command

parse_command(cmd)

Parse a Flight command into a request.

Parameters:

Name Type Description Default
cmd bytes

The JSON-encoded request.

required

Returns:

Name Type Description
request_type RequestType

The type of request.

kwargs dict

Arguments corresponding to the specific request.

Source code in arrakis/flight.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def parse_command(cmd: bytes) -> tuple[RequestType, dict]:
    """Parse a Flight command into a request.

    Parameters
    ----------
    cmd : bytes
        The JSON-encoded request.

    Returns
    -------
    request_type : RequestType
        The type of request.
    kwargs : dict
        Arguments corresponding to the specific request.

    """
    parsed = json.loads(cmd.decode("utf-8"))
    return RequestType[parsed["request"]], parsed["args"]