Publishing Data¶
The arrakis.publish.Publisher class publishes timeseries data into Arrakis via Kafka.
Publisher Setup¶
A publisher is identified by a server-assigned publisher ID. Create a
Publisher, register it to retrieve channel assignments, then enter the
publication context:
from arrakis import Publisher
publisher = Publisher("my_producer")
publisher.register()
Registration queries the server for the channels assigned to this publisher ID
and their Kafka partition information. If the publisher ID is unknown to the
server, a ValueError is raised.
After registration, inspect the assigned channels:
for name, channel in publisher.channels.items():
print(f"{name}: {channel.sample_rate} Hz, stride={channel.stride} ns")
Publishing Blocks¶
Publishing uses a context manager that sets up the Kafka producer connection:
from arrakis import Channel, Publisher, SeriesBlock, Time
import numpy
publisher = Publisher("my_producer")
publisher.register()
metadata = {
"H1:FKE-TEST_CHANNEL1": Channel(
"H1:FKE-TEST_CHANNEL1",
data_type=numpy.float64,
sample_rate=4,
),
"H1:FKE-TEST_CHANNEL2": Channel(
"H1:FKE-TEST_CHANNEL2",
data_type=numpy.int32,
sample_rate=2,
),
}
with publisher:
# one second of data: 4 samples at 4 Hz, 2 samples at 2 Hz
series = {
"H1:FKE-TEST_CHANNEL1": numpy.array(
[0.1, 0.2, 0.3, 0.4], dtype=numpy.float64
),
"H1:FKE-TEST_CHANNEL2": numpy.array(
[1, 2], dtype=numpy.int32
),
}
block = SeriesBlock(
1234567890 * Time.SECONDS,
series,
metadata,
)
publisher.publish(block)
Warning
Always use the context manager (with publisher:) or call publisher.enter()
before publishing. Calling publish() without initializing the Kafka
producer raises a RuntimeError.
Time and Frequency Units¶
The Time and Freq enums help with unit
conversions:
from arrakis import Time, Freq
# Time: convert to nanoseconds
timestamp_ns = 1234567890 * Time.SECONDS # GPS seconds -> nanoseconds
timestamp_ns = 1234567890 * Time.s # shorthand
# Freq: convert sample rate to stride in nanoseconds
stride_ns = 64 * Freq.Hz # 64 Hz -> stride for one sample
stride_ns = 1 * Freq.kHz # 1 kHz -> stride for one sample
Time members:
| Member | Value (ns) |
|---|---|
SECONDS / s |
1,000,000,000 |
MILLISECONDS / ms |
1,000,000 |
MICROSECONDS / us |
1,000 |
NANOSECONDS / ns |
1 |
Freq members:
| Member | Samples/s |
|---|---|
Hz |
1 |
kHz |
1,000 |
MHz |
1,000,000 |
GHz |
1,000,000,000 |
Multiplying a number by a Freq member converts it to a stride in nanoseconds:
64 * Freq.Hz gives the nanosecond period for a 64 Hz signal.
Validation¶
The publisher validates each block before sending:
- Channel metadata
- The channel metadata in the block must exactly match what the server assigned during registration (name, data type, sample rate).
- Stride
- The block duration must match the publisher's expected stride. Blocks with
the wrong duration are rejected with a
ValueError. - Monotonic timestamps
- Blocks must be published with strictly increasing timestamps. Publishing a
block with a timestamp equal to or earlier than the previous block raises a
ValueError.