Streaming Data¶
Use arrakis.api.stream to receive timeseries data as a sequence of arrakis.block.SeriesBlock objects. This is useful for processing data incrementally or working with live data.
Live Streaming¶
Omit start and end times to stream live data starting from the current time:
import arrakis
channels = [
"H1:CAL-DELTAL_EXTERNAL_DQ",
"H1:LSC-POP_A_LF_OUT_DQ",
]
for block in arrakis.stream(channels):
print(f"t={block.time:.1f} duration={block.duration}s")
for channel, series in block.items():
print(f" {channel}: {len(series)} samples")
Live streaming runs indefinitely. Break out of the loop or use Ctrl+C to
stop.
Historical Streaming¶
Provide start and end GPS times to stream historical data:
for block in arrakis.stream(channels, start=1187000000, end=1187001000):
print(f"t={block.time:.1f} duration={block.duration}s")
The stream yields blocks sequentially, each covering a fixed time stride. The stride is determined by the channel configuration on the server. Historical streams terminate automatically when the end time is reached.
How Streaming Works¶
When you call stream(), arrakis:
- Queries the server for channel metadata (stride, latency, partitioning).
- Opens one or more data streams via Arrow Flight or Kafka.
- Feeds incoming blocks through a multiplexer (arrakis.mux.BlockMuxStream) that synchronizes data across streams and handles gaps.
- Yields complete, time-aligned
SeriesBlockobjects.
Gap Handling¶
If data for a channel is missing or arrives late, the multiplexer inserts gap blocks -- blocks where the data arrays are NumPy masked arrays with all values masked. You can detect gaps on a per-series basis:
for block in arrakis.stream(channels):
for channel, series in block.items():
if series.has_nulls:
print(f" {channel}: gap detected")
else:
print(f" {channel}: {len(series)} samples")
The timeout before a gap is inserted is determined by the channel's
max_latency attribute. For live streams, the multiplexer continuously checks
for timeouts; for historical streams, gaps indicate that the data was not
available on the server.
Stream vs Fetch¶
fetch() |
stream() |
|
|---|---|---|
| Returns | Single SeriesBlock |
Generator of SeriesBlock |
| Memory | Loads full range | One block at a time |
| Live data | No | Yes (omit start/end) |
| Use case | Analysis of bounded intervals | Continuous processing, large ranges |
See Fetching Data for the batch alternative.