Skip to content

publish

Publisher API.

Publisher

Publisher(publisher_id, url=None)

Publish timeseries data to Arrakis.

Parameters:

Name Type Description Default
id str

Publisher ID string.

required
url str | None

Initial Flight URL to connect to. Will be automatically determined if not specified.

None
Source code in arrakis/publish.py
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
def __init__(self, publisher_id: str, url: str | None = None):
    """Initialize Publisher.

    Parameters
    ----------
    id : str
        Publisher ID string.
    url : str | None
        Initial Flight URL to connect to.  Will be automatically
        determined if not specified.

    """
    if not HAS_KAFKA:
        msg = (
            "Publishing requires confluent-kafka to be installed."
            "This is provided by the 'publish' extra or it can be "
            "installed manually through pip or conda."
        )
        raise ImportError(msg)

    self.publisher_id = publisher_id
    self.initial_url = parse_url(url)

    self.channels: dict[str, Channel] = {}
    self.stride: int | None = None

    self._producer: Producer
    self._validator = RequestValidator()

close

close()

Exit publication context manager.

Source code in arrakis/publish.py
289
290
291
292
293
def close(self) -> None:
    """Exit publication context manager."""
    logger.info("closing kafka producer...")
    with contextlib.suppress(Exception):
        self._producer.flush()

enter

enter()

Enter publication context manager

Source code in arrakis/publish.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
def enter(self) -> None:
    """Enter publication context manager"""
    # get connection properties
    producer_info: dict[str, str] = {}
    descriptor = create_descriptor(
        RequestType.Publish,
        publisher_id=self.publisher_id,
        validator=self._validator,
    )
    with connect(self.initial_url) as client:
        flight_info = client.get_flight_info(descriptor)
        with MultiFlightReader(flight_info.endpoints, client) as stream:
            for data in stream.unpack():
                kv_pairs = data["properties"]
                producer_info.update(dict(kv_pairs))
    logger.info("producer info: %s", producer_info)

    # set up producer
    self._producer = Producer(
        {
            "message.max.bytes": 10_000_000,  # 10 MB
            "enable.idempotence": True,
            **producer_info,
        }
    )

publish

publish(block, timeout=constants.DEFAULT_TIMEOUT)

Publish timeseries data

Parameters:

Name Type Description Default
block SeriesBlock

A data block with all channels to publish.

required
timeout timedelta

The maximum time to wait to publish before timing out. Default is 2 seconds.

DEFAULT_TIMEOUT
Source code in arrakis/publish.py
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def publish(
    self,
    block: SeriesBlock,
    timeout: timedelta = constants.DEFAULT_TIMEOUT,
) -> None:
    """Publish timeseries data

    Parameters
    ----------
    block : SeriesBlock
        A data block with all channels to publish.
    timeout : timedelta, optional
        The maximum time to wait to publish before timing out.
        Default is 2 seconds.

    """
    if not hasattr(self, "_producer") or not self._producer:
        msg = (
            "publication interface not initialized, "
            "please use context manager when publishing."
        )
        raise RuntimeError(msg)

    # check for attempt to publish invalid channels
    for name, channel in block.channels.items():
        if channel != self.channels[name]:
            msg = f"invalid channel for this publisher: {channel}"
            raise ValueError(msg)
    # FIXME: check for invalid block duration
    # FIXME: check for block time in the future
    # FIXME: warning for missing channels

    # publish data for each data type
    for partition_id, batch in block.to_row_batches(self.channels):
        topic = f"arrakis-{partition_id}"
        logger.debug("publishing to topic %s: %s", topic, batch)
        self._producer.produce(topic=topic, value=serialize_batch(batch))
        self._producer.flush()

register

register(channels=None)

register channels for publication

For most publishers, channels are not specified when registering and this method will query the server for the allowable channels for this publisher and register them internally.

For publishers allowed to register their own channels, all channels they expect to publish should be provided as argument, and they will be registered with the server, who will in turn provide the necessarily partition information for the channels to be published into kafka.

Source code in arrakis/publish.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
def register(self, channels: Iterable[Channel] | None = None):
    """register channels for publication

    For most publishers, channels are not specified when
    registering and this method will query the server for the
    allowable channels for this publisher and register them
    internally.

    For publishers allowed to register their own channels, all
    channels they expect to publish should be provided as
    argument, and they will be registered with the server, who
    will in turn provide the necessarily partition information for
    the channels to be published into kafka.

    """

    if channels:
        msg = "Channel self-register is not currently supported."
        raise NotImplementedError(msg)

        for name, id_, index in self._partition_channels(channels):
            self.channels[name] = replace(
                self.channels[name],
                partition_id=id_,
                partition_index=index,
            )

        # check that all channels requested channels have been
        # registered
        for channel in channels:
            if channel.name not in self.channels:
                msg = f"channel {channel.name} was not properly registered."
                raise ValueError(msg)

    # This find call should raise an error if the publisher_id
    # is not known to the server.
    for channel in Client(self.initial_url).find(publisher=self.publisher_id):
        if self.stride:
            assert channel.stride == self.stride, (
                "channels specify inconsistent stride"
            )
        else:
            self.stride = channel.stride
        self.channels[channel.name] = channel

    if not self.channels:
        # FIXME: more informative error message here
        msg = f"unknown publisher ID '{self.publisher_id}'."
        raise ValueError(msg)

    for channel in self.channels.values():
        assert channel.partition_id is not None, (
            f"Channel {channel} is missing partition_id."
        )
        assert channel.partition_index is not None, (
            f"Channel {channel} is missing partition_index."
        )

    return self

channel_to_dtype_name

channel_to_dtype_name(channel)

Given a channel, return the data type's name.

Source code in arrakis/publish.py
69
70
71
72
def channel_to_dtype_name(channel: Channel) -> str:
    """Given a channel, return the data type's name."""
    assert isinstance(channel.data_type, numpy.dtype)
    return channel.data_type.name

serialize_batch

serialize_batch(batch)

Serialize a record batch to bytes.

Parameters:

Name Type Description Default
batch RecordBatch

The batch to serialize.

required

Returns:

Type Description
bytes

The serialized buffer.

Source code in arrakis/publish.py
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
def serialize_batch(batch: pyarrow.RecordBatch):
    """Serialize a record batch to bytes.

    Parameters
    ----------
    batch : pyarrow.RecordBatch
        The batch to serialize.

    Returns
    -------
    bytes
        The serialized buffer.

    """
    sink = pyarrow.BufferOutputStream()
    with pyarrow.ipc.new_stream(sink, batch.schema) as writer:
        writer.write_batch(batch)
    return sink.getvalue()