Skip to content

channel

Channel information.

Channel dataclass

Channel(name, data_type, sample_rate, time=None, publisher=None, partition_id=None, partition_index=None, expected_latency=None)

Metadata associated with a channel.

Channels have the form {domain}:*.

Parameters:

Name Type Description Default
name str

The name associated with this channel.

required
data_type dtype

The data type associated with this channel.

required
sample_rate float

The sampling rate associated with this channel.

required
time int

The timestamp when this metadata became active.

None
publisher str

The publisher associated with this channel.

None
partition_id str

The Kafka partition ID associated with this channel.

None
partition_index int | None

Partition index for the channel. It is unique within the partition and allows the use of an integer value to identify the channel instead of a string.

None
expected_latency int | None

Expected publication latency for this channel's data, in seconds.

None

as_dict

as_dict()

Return metadata as "serialized" dict

The data_type attribute is converted to its string representation.

Source code in arrakis/channel.py
151
152
153
154
155
156
157
158
159
160
def as_dict(self):
    """Return metadata as "serialized" dict

    The data_type attribute is converted to its string
    representation.

    """
    data = asdict(self)
    data["data_type"] = self.data_type.name
    return data

fields staticmethod

fields()

Channel field names

Source code in arrakis/channel.py
 98
 99
100
101
@staticmethod
def fields():
    """Channel field names"""
    return tuple(field.name for field in fields(Channel))

from_field classmethod

from_field(field)

Create a Channel from Arrow Flight field metadata.

Parameters:

Name Type Description Default
field field

The channel field containing relevant metadata.

required

Returns:

Type Description
Channel

The newly created channel.

Source code in arrakis/channel.py
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
@classmethod
def from_field(cls, field: pyarrow.field) -> Channel:
    """Create a Channel from Arrow Flight field metadata.

    Parameters
    ----------
    field : pyarrow.field
        The channel field containing relevant metadata.

    Returns
    -------
    Channel
        The newly created channel.

    """
    data_type = numpy.dtype(_list_dtype_to_str(field.type))
    sample_rate = float(field.metadata[b"rate"].decode())
    return cls(field.name, data_type, sample_rate)

from_json classmethod

from_json(payload)

Create a Channel from its JSON representation.

Parameters:

Name Type Description Default
payload str

The JSON-serialized channel.

required

Returns:

Type Description
Channel

The newly created channel.

Source code in arrakis/channel.py
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
@classmethod
def from_json(cls, payload: str) -> Channel:
    """Create a Channel from its JSON representation.

    Parameters
    ----------
    payload : str
        The JSON-serialized channel.

    Returns
    -------
    Channel
        The newly created channel.

    """
    obj = json.loads(payload)
    obj["data_type"] = numpy.dtype(obj["data_type"])
    return cls(**obj)

to_json

to_json(time=None)

Serialize channel metadata to JSON.

Parameters:

Name Type Description Default
time int

If specified, the timestamp when this metadata became active.

None
Source code in arrakis/channel.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
def to_json(self, time: int | None = None) -> str:
    """Serialize channel metadata to JSON.

    Parameters
    ----------
    time : int, optional
        If specified, the timestamp when this metadata became active.

    """
    obj = self.as_dict()
    if time is not None:
        obj["time"] = time
    obj = {k: v for k, v in obj.items() if v is not None}
    return json.dumps(obj)

ParsedChannelName dataclass

ParsedChannelName(domain, subsystem, subsystem_delimiter, rest)

A parsed channel name

Channel names have the following structure:

<domain>:<subsystem>[-_]<rest>

parse classmethod

parse(name)

Parse a channel name into it's constituent parts

Source code in arrakis/channel.py
44
45
46
47
48
49
50
51
52
53
54
55
@classmethod
def parse(cls, name: str) -> ParsedChannelName:
    """Parse a channel name into it's constituent parts"""
    if rem := CHANNEL_NAME_RE.match(name):
        return cls(
            rem["domain"],
            rem["subsystem"],
            rem["delimiter"],
            rem["rest"],
        )
    msg = f"Invalid channel name format: '{name}'"
    raise ValueError(msg)

PartitionInfo dataclass

PartitionInfo(name, id, index)

Partition metadata associated with a channel.

Parameters:

Name Type Description Default
name str

The name associated with this channel.

required
id str

The partition ID associated with this channel.

required
index int

Partition index for the channel. It is unique within the partition and allows the use of an integer value to identify the channel instead of a string.

required

from_channel classmethod

from_channel(channel)

Extract partition information from a Channel.

Parameters:

Name Type Description Default
channel Channel

The channel containing relevant metadata.

required

Returns:

Type Description
PartitionInfo

The newly created partition information.

Source code in arrakis/channel.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
@classmethod
def from_channel(cls, channel: Channel) -> PartitionInfo:
    """Extract partition information from a Channel.

    Parameters
    ----------
    channel : Channel
        The channel containing relevant metadata.

    Returns
    -------
    PartitionInfo
        The newly created partition information.

    """
    assert channel.partition_id is not None
    assert channel.partition_index is not None
    return cls(
        name=channel.name,
        id=channel.partition_id,
        index=channel.partition_index,
    )