Skip to content

channel

Channel information.

Channel dataclass

Channel(name, data_type, sample_rate, time=None, publisher=None, partition_id=None, partition_index=None, stride=None, max_latency=None)

Metadata associated with a channel.

Channels have the form {domain}:*.

Parameters:

Name Type Description Default
name str

The name associated with this channel.

required
data_type str

The data type associated with this channel.

required
sample_rate float

The sampling rate associated with this channel.

required
time int

The timestamp when this metadata became active.

None
publisher str

The publisher associated with this channel.

None
partition_id str

The Kafka partition ID associated with this channel.

None
partition_index int

Partition index for the channel. It is unique within the partition and allows the use of an integer value to identify the channel instead of a string.

None
stride int

Time duration in individual blocks of data for this channel, in nanoseconds.

None
max_latency int

Maximum expected publication latency for this channel's data, in seconds.

None

as_dict

as_dict()

Return metadata as "serialized" dict

Source code in arrakis/channel.py
170
171
172
def as_dict(self) -> dict[str, Any]:
    """Return metadata as "serialized" dict"""
    return asdict(self)

fields staticmethod

fields()

Channel field names

Source code in arrakis/channel.py
102
103
104
105
@staticmethod
def fields() -> tuple[str, ...]:
    """Channel field names"""
    return tuple(field.name for field in fields(Channel))

from_field classmethod

from_field(field)

Create a Channel from Arrow Flight field metadata.

Parameters:

Name Type Description Default
field field

The channel field containing relevant metadata.

required

Returns:

Type Description
Channel

The newly created channel.

Source code in arrakis/channel.py
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
@classmethod
def from_field(cls, field: pyarrow.field) -> Channel:
    """Create a Channel from Arrow Flight field metadata.

    Parameters
    ----------
    field : pyarrow.field
        The channel field containing relevant metadata.

    Returns
    -------
    Channel
        The newly created channel.

    """
    data_type = _list_dtype_to_str(field.type)
    sample_rate = float(field.metadata[b"rate"].decode())
    return cls(field.name, data_type, sample_rate)

from_json classmethod

from_json(payload)

Create a Channel from its JSON representation.

Parameters:

Name Type Description Default
payload str

The JSON-serialized channel.

required

Returns:

Type Description
Channel

The newly created channel.

Source code in arrakis/channel.py
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@classmethod
def from_json(cls, payload: str) -> Channel:
    """Create a Channel from its JSON representation.

    Parameters
    ----------
    payload : str
        The JSON-serialized channel.

    Returns
    -------
    Channel
        The newly created channel.

    """
    obj = json.loads(payload)
    return cls(**obj)

ParsedChannelName dataclass

ParsedChannelName(domain, subsystem, subsystem_delimiter, rest)

A parsed channel name

Channel names have the following structure:

<domain>:<subsystem>[-_]<rest>

parse classmethod

parse(name)

Parse a channel name into it's constituent parts

Source code in arrakis/channel.py
44
45
46
47
48
49
50
51
52
53
54
55
@classmethod
def parse(cls, name: str) -> ParsedChannelName:
    """Parse a channel name into it's constituent parts"""
    if rem := CHANNEL_NAME_RE.match(name):
        return cls(
            rem["domain"],
            rem["subsystem"],
            rem["delimiter"],
            rem["rest"],
        )
    msg = f"Invalid channel name format: '{name}'"
    raise ValueError(msg)