Skip to content

Publishing Data

The arrakis.publish.Publisher class publishes timeseries data into Arrakis via Kafka.

Publisher Setup

A publisher is identified by a server-assigned publisher ID. Create a Publisher, register it to retrieve channel assignments, then enter the publication context:

from arrakis import Publisher

publisher = Publisher("my_producer")
publisher.register()

Registration queries the server for the channels assigned to this publisher ID and their Kafka partition information. If the publisher ID is unknown to the server, a ValueError is raised.

After registration, inspect the assigned channels:

for name, channel in publisher.channels.items():
    print(f"{name}: {channel.sample_rate} Hz, stride={channel.stride} ns")

Publishing Blocks

Publishing uses a context manager that sets up the Kafka producer connection:

from arrakis import Channel, Publisher, SeriesBlock, Time
import numpy

publisher = Publisher("my_producer")
publisher.register()

metadata = {
    "H1:FKE-TEST_CHANNEL1": Channel(
        "H1:FKE-TEST_CHANNEL1",
        data_type=numpy.float64,
        sample_rate=4,
    ),
    "H1:FKE-TEST_CHANNEL2": Channel(
        "H1:FKE-TEST_CHANNEL2",
        data_type=numpy.int32,
        sample_rate=2,
    ),
}

with publisher:
    # one second of data: 4 samples at 4 Hz, 2 samples at 2 Hz
    series = {
        "H1:FKE-TEST_CHANNEL1": numpy.array(
            [0.1, 0.2, 0.3, 0.4], dtype=numpy.float64
        ),
        "H1:FKE-TEST_CHANNEL2": numpy.array(
            [1, 2], dtype=numpy.int32
        ),
    }
    block = SeriesBlock(
        1234567890 * Time.SECONDS,
        series,
        metadata,
    )
    publisher.publish(block)

Warning

Always use the context manager (with publisher:) or call publisher.enter() before publishing. Calling publish() without initializing the Kafka producer raises a RuntimeError.

Time and Frequency Units

The Time and Freq enums help with unit conversions:

from arrakis import Time, Freq

# Time: convert to nanoseconds
timestamp_ns = 1234567890 * Time.SECONDS    # GPS seconds -> nanoseconds
timestamp_ns = 1234567890 * Time.s           # shorthand

# Freq: convert sample rate to stride in nanoseconds
stride_ns = 64 * Freq.Hz     # 64 Hz -> stride for one sample
stride_ns = 1 * Freq.kHz     # 1 kHz -> stride for one sample

Time members:

Member Value (ns)
SECONDS / s 1,000,000,000
MILLISECONDS / ms 1,000,000
MICROSECONDS / us 1,000
NANOSECONDS / ns 1

Freq members:

Member Samples/s
Hz 1
kHz 1,000
MHz 1,000,000
GHz 1,000,000,000

Multiplying a number by a Freq member converts it to a stride in nanoseconds: 64 * Freq.Hz gives the nanosecond period for a 64 Hz signal.

Validation

The publisher validates each block before sending:

Channel metadata
The channel metadata in the block must exactly match what the server assigned during registration (name, data type, sample rate).
Stride
The block duration must match the publisher's expected stride. Blocks with the wrong duration are rejected with a ValueError.
Monotonic timestamps
Blocks must be published with strictly increasing timestamps. Publishing a block with a timestamp equal to or earlier than the previous block raises a ValueError.