Skip to content

publish

Publisher API.

Publisher

Publisher(publisher_id, url=None)

Publisher to publish timeseries to Arrakis service.

Parameters:

Name Type Description Default
id str

Publisher ID string.

required
url str

Initial Flight URL to connect to.

None
Source code in arrakis/publish.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
def __init__(self, publisher_id: str, url: str | None = None):
    if not HAS_KAFKA:
        msg = (
            "Publishing requires confluent-kafka to be installed."
            "This is provided by the 'publish' extra or it can be "
            "installed manually through pip or conda."
        )
        raise ImportError(msg)

    self.publisher_id = publisher_id
    self.initial_url = parse_url(url)

    self.channels: dict[str, Channel] = {}

    self._producer: Producer
    self._partitions: dict[str, str]
    self._registered = False

publish

publish(block, timeout=constants.DEFAULT_TIMEOUT)

Publish timeseries data

Parameters:

Name Type Description Default
block SeriesBlock

A data block with all channels to publish.

required
timeout timedelta

The maximum time to wait to publish before timing out. Default is 2 seconds.

DEFAULT_TIMEOUT
Source code in arrakis/publish.py
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
def publish(
    self,
    block: SeriesBlock,
    timeout: timedelta = constants.DEFAULT_TIMEOUT,
) -> None:
    """Publish timeseries data

    Parameters
    ----------
    block : SeriesBlock
        A data block with all channels to publish.
    timeout : timedelta, optional
        The maximum time to wait to publish before timing out.
        Default is 2 seconds.

    """
    if not self._producer:
        msg = (
            "publication interface not initialized, "
            "please use context manager when publishing."
        )
        raise RuntimeError(msg)

    for name, channel in block.channels.items():
        if channel != self.channels[name]:
            msg = f"invalid channel for this publisher: {channel}"
            raise ValueError(msg)

    # FIXME: updating partitions should only be allowed for
    # special blessed publishers, that are currently not using
    # this interface, so we're disabling this functionality for
    # the time being, until we have a better way to manage it.
    #
    # # check for new metadata changes
    # changed = set(block.channels.values()) - set(self.channels.values())
    # # exchange to transfer metadata and get new/updated partition IDs
    # if changed:
    #     self._update_partitions(changed)

    # publish data for each data type, splitting into
    # subblocks based on a maximum channel maximum
    for partition_id, batch in block.to_row_batches(self._partitions):
        topic = f"arrakis-{partition_id}"
        logger.debug("publishing to topic %s: %s", topic, batch)
        self._producer.produce(topic=topic, value=serialize_batch(batch))
        self._producer.flush()

channel_to_dtype_name

channel_to_dtype_name(channel)

Given a channel, return the data type's name.

Source code in arrakis/publish.py
61
62
63
64
def channel_to_dtype_name(channel: Channel) -> str:
    """Given a channel, return the data type's name."""
    assert channel.data_type is not None
    return channel.data_type.name

serialize_batch

serialize_batch(batch)

Serialize a record batch to bytes.

Parameters:

Name Type Description Default
batch RecordBatch

The batch to serialize.

required

Returns:

Type Description
bytes

The serialized buffer.

Source code in arrakis/publish.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def serialize_batch(batch: pyarrow.RecordBatch):
    """Serialize a record batch to bytes.

    Parameters
    ----------
    batch : pyarrow.RecordBatch
        The batch to serialize.

    Returns
    -------
    bytes
        The serialized buffer.

    """
    sink = pyarrow.BufferOutputStream()
    with pyarrow.ipc.new_stream(sink, batch.schema) as writer:
        writer.write_batch(batch)
    return sink.getvalue()