Skip to content

stream

StreamReader

Bases: AbstractContextManager, Protocol

Non-blocking Arrow RecordBatch stream reader

May produce multiple independent streams. Streams should be buffered in a queue, and the read method should return all RecordBatches currently in the queue.

streams property

streams

stream channels dictionary

A StreamReader may produce multiple independent streams. This property includes a key for each stream name, who value is the list of channel names in each individual stream.

close

close()

close the stream

Source code in arrakis/stream.py
96
97
98
def close(self):
    """close the stream"""
    pass

done

done()

return True if the collector is done

Source code in arrakis/stream.py
86
87
88
def done(self) -> bool:
    """return True if the collector is done"""
    return False

enter

enter()

open the streams

Source code in arrakis/stream.py
45
46
47
def enter(self):
    """open the streams"""
    pass

read

read(*, convert_blocks: Literal[False] = False) -> Generator[tuple[str, RecordBatch], None, None]
read(*, convert_blocks: Literal[True] = True) -> Generator[tuple[str, SeriesBlock], None, None]
read(*, convert_blocks=False)

Read all available elements.

Parameters:

Name Type Description Default
convert_blocks bool

Convert RecordBatches to SeriesBlocks

False

Yields:

Type Description
tuple of (stream name, RecordBatch | SeriesBlock)
Source code in arrakis/stream.py
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
def read(
    self,
    *,
    convert_blocks: bool = False,
) -> Generator[tuple[str, RecordBatch | SeriesBlock], None, None]:
    """Read all available elements.

    Parameters
    ----------
    convert_blocks : bool
        Convert RecordBatches to SeriesBlocks

    Yields
    ------
    tuple of (stream name, RecordBatch | SeriesBlock)

    """
    ...

unpack

unpack()

synchronously unpack all batch streams into individual elements

Source code in arrakis/stream.py
90
91
92
93
94
def unpack(self) -> Generator[Any, None, None]:
    """synchronously unpack all batch streams into individual elements"""
    while not self.done():
        for _, batch in self.read(convert_blocks=False):
            yield from batch.to_pylist()