Skip to content

block

Series block representation of timeseries data.

Series dataclass

Series(time_ns, data, channel)

Bases: Generic[ChannelLike]

Single-channel timeseries data for a given timestamp.

Parameters:

Name Type Description Default
time_ns int

The timestamp associated with this data, in nanoseconds.

required
data ndarray

The timeseries data.

required
channel Channel

Channel metadata associated with this timeseries.

required

data_type property

data_type

Data type of the data array's elements.

dt property

dt

The time separation in seconds between successive samples.

dtype property

dtype

Data type of the data array's elements.

duration cached property

duration

Series duration in seconds.

duration_ns cached property

duration_ns

Series duration in nanoseconds.

name property

name

Channel name.

sample_rate property

sample_rate

Data rate for this series in samples per second (Hz).

t0 property

t0

Timestamp associated with this data, in seconds.

time cached property

time

Timestamp associated with this data, in seconds.

times cached property

times

The array of times corresponding to all data points in the series.

SeriesBlock dataclass

SeriesBlock(time_ns, data, channels=dict())

Bases: Generic[ChannelLike]

Series block containing timeseries for channels for a given timestamp.

Parameters:

Name Type Description Default
time_ns int

The timestamp associated with this data, in nanoseconds.

required
data dict[str, ndarray]

Mapping between channels and timeseries.

required
channels dict[str, Channel]

Channel metadata associated with this data block.

dict()

duration cached property

duration

Duration of this block, in seconds.

duration_ns property

duration_ns

Duration of this block, in nanoseconds.

t0 property

t0

Timestamp associated with this block, in seconds.

time cached property

time

Timestamp associated with this block, in seconds.

filter

filter(channels=None)

Filter a block based on criteria.

Fixme

more info needed

Parameters:

Name Type Description Default
channels list[str]

If specified, keep only these channels.

None

Returns:

Type Description
SeriesBlock

The filtered series.

Source code in arrakis/block.py
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
def filter(self, channels: list[str] | None = None) -> SeriesBlock:
    """Filter a block based on criteria.

    FIXME: more info needed

    Parameters
    ----------
    channels : list[str], optional
        If specified, keep only these channels.

    Returns
    -------
    SeriesBlock
        The filtered series.

    """
    if not channels:
        return self

    data = {channel: self.data[channel] for channel in channels}
    if self.channels:
        channel_dict = {channel: self.channels[channel] for channel in channels}
    else:
        channel_dict = self.channels

    return type(self)(self.time_ns, data, channel_dict)

from_column_batch classmethod

from_column_batch(batch, channels)

Create a series block from a record batch.

Parameters:

Name Type Description Default
batch RecordBatch

A record batch, with a 'time' column with the timestamp and channel columns with all channels to publish.

required
channels dict[str, Channel]

Channel metadata. The metadata for the channels defined in the batch will be extracted from this dictionary, so this dictionary may include metadata for additional channels now included in the batch.

required

Returns:

Type Description
SeriesBlock

The block representation of the record batch.

Source code in arrakis/block.py
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
@classmethod
def from_column_batch(
    cls,
    batch: pyarrow.RecordBatch,
    channels: dict[str, ChannelLike],
) -> SeriesBlock:
    """Create a series block from a record batch.

    Parameters
    ----------
    batch : pyarrow.RecordBatch
        A record batch, with a 'time' column with the timestamp
        and channel columns with all channels to publish.
    channels : dict[str, Channel]
        Channel metadata.  The metadata for the channels defined
        in the batch will be extracted from this dictionary, so
        this dictionary may include metadata for additional
        channels now included in the batch.

    Returns
    -------
    SeriesBlock
        The block representation of the record batch.

    """
    time = batch.column("time").to_numpy()[0]
    fields: list[pyarrow.field] = list(batch.schema)
    channel_names = [field.name for field in fields[1:]]
    series_dict = {
        channel: pyarrow.compute.list_flatten(batch.column(channel)).to_numpy()
        for channel in channel_names
    }
    channel_dict = {channel: channels[channel] for channel in channel_names}
    return cls(time, series_dict, channel_dict)

from_row_batch classmethod

from_row_batch(batch, channels)

Create a series block from a record batch.

Parameters:

Name Type Description Default
batch RecordBatch

A record batch, with a 'time' column with the timestamp, a 'channel' column with the channel name, and a 'data' column containing the timeseries.

required
channels dict[str, Channel]

Channel metadata. The metadata for the channels defined in the batch will be extracted from this dictionary, so this dictionary may include metadata for additional channels now included in the batch.

required

Returns:

Type Description
SeriesBlock

The block representation of the record batch.

Source code in arrakis/block.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@classmethod
def from_row_batch(
    cls,
    batch: pyarrow.RecordBatch,
    channels: dict[str, ChannelLike],
) -> SeriesBlock:
    """Create a series block from a record batch.

    Parameters
    ----------
    batch : pyarrow.RecordBatch
        A record batch, with a 'time' column with the timestamp, a
        'channel' column with the channel name, and a 'data' column
        containing the timeseries.
    channels : dict[str, Channel]
        Channel metadata.  The metadata for the channels defined
        in the batch will be extracted from this dictionary, so
        this dictionary may include metadata for additional
        channels now included in the batch.

    Returns
    -------
    SeriesBlock
        The block representation of the record batch.

    """
    time = batch.column("time").to_numpy()[0]
    channel_names = batch.column("channel").to_pylist()
    data = batch.column("data")
    series_dict = {}
    channel_dict = {}
    for idx, channel in enumerate(channel_names):
        series_dict[channel] = pyarrow.array(data[idx]).to_numpy()
        channel_dict[channel] = channels[channel]
    return cls(time, series_dict, channel_dict)

to_column_batch

to_column_batch()

Create a row-based record batch from a series block.

Returns:

Type Description
RecordBatch

A record batch, with a 'time' column with the timestamp and channel columns with all channels to publish.

Source code in arrakis/block.py
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def to_column_batch(self) -> pyarrow.RecordBatch:
    """Create a row-based record batch from a series block.

    Returns
    -------
    pyarrow.RecordBatch
        A record batch, with a 'time' column with the timestamp
        and channel columns with all channels to publish.

    """
    schema = self._generate_column_schema()
    return pyarrow.RecordBatch.from_arrays(
        [
            pyarrow.array([self.time_ns], type=schema.field("time").type),
            *[
                pyarrow.array([series], type=schema.field(channel).type)
                for channel, series in self.data.items()
            ],
        ],
        schema=schema,
    )

to_row_batches

to_row_batches(partitions)

Create column-based record batches from a series block.

Yields:

Type Description
RecordBatch

Record batches, one per data type. The record batches have a 'time' column with the timestamp, a 'channel' column with the channel name, and a 'data' column containing the timeseries.

Source code in arrakis/block.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
def to_row_batches(self, partitions: dict) -> Iterator[pyarrow.RecordBatch]:
    """Create column-based record batches from a series block.

    Yields
    -------
    pyarrow.RecordBatch
        Record batches, one per data type. The record batches have a
        'time' column with the timestamp, a 'channel' column with
        the channel name, and a 'data' column containing the timeseries.

    """
    # group channels by partitions
    channels_by_part = defaultdict(list)
    for channel in self.keys():
        if channel in partitions:
            partition = partitions[channel]
            channels_by_part[partition].append(channel)

    # generate column-based record batches
    for partition_id, channels in channels_by_part.items():
        # all channels have the same data type
        dtype = self.channels[channels[0]].data_type
        schema = self._generate_row_schema(pyarrow.from_numpy_dtype(dtype))
        series: list[numpy.ndarray] = [self.data[channel] for channel in channels]
        yield (
            partition_id,
            pyarrow.RecordBatch.from_arrays(
                [
                    pyarrow.array(
                        numpy.full(len(channels), self.time_ns),
                        type=schema.field("time").type,
                    ),
                    pyarrow.array(channels, type=schema.field("channel").type),
                    pyarrow.array(series, type=schema.field("data").type),
                ],
                schema=schema,
            ),
        )

combine_blocks

combine_blocks(*blocks)

Combine multiple SeriesBlocks from the same time into a single SeriesBlock

Each block must contain a distinct set of channels, and the time properties of each block must agree, otherwise an AssertionError will be thrown.

Parameters:

Name Type Description Default
*blocks SeriesBlock

The blocks to combine.

()

Returns:

Type Description
SeriesBlock

The combined block.

Source code in arrakis/block.py
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
def combine_blocks(*blocks: SeriesBlock) -> SeriesBlock:
    """Combine multiple SeriesBlocks from the same time into a single SeriesBlock

    Each block must contain a distinct set of channels, and the time
    properties of each block must agree, otherwise an AssertionError
    will be thrown.

    Parameters
    ----------
    *blocks : SeriesBlock
        The blocks to combine.

    Returns
    -------
    SeriesBlock
        The combined block.

    """
    time_ns = blocks[0].time_ns
    duration_ns = blocks[0].duration_ns
    series_dict: dict[str, numpy.ndarray] = {}
    channel_dict: dict[str, Channel] = {}
    for block in blocks:
        assert block.time_ns == time_ns, "all block times must agree"
        assert block.duration_ns == duration_ns, "all block durations must agree"
        for channel, series in block.items():
            assert channel not in series_dict, (
                f"channel {channel} has already been included from another block"
            )
            series_dict[channel] = series.data
            channel_dict[channel] = series.channel
    return SeriesBlock(time_ns, series_dict, channel_dict)

concatenate_blocks

concatenate_blocks(*blocks)

Join a sequence of timeseries blocks into a single block.

If the SeriesBlock arguments are not sequential in time an AssertionError will be thrown.

Parameters:

Name Type Description Default
*blocks SeriesBlock

The timeseries blocks to concatenate.

()

Returns:

Type Description
SeriesBlock

The combined timeseries block.

Source code in arrakis/block.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
def concatenate_blocks(*blocks: SeriesBlock) -> SeriesBlock:
    """Join a sequence of timeseries blocks into a single block.

    If the SeriesBlock arguments are not sequential in time an
    AssertionError will be thrown.

    Parameters
    ----------
    *blocks : SeriesBlock
        The timeseries blocks to concatenate.

    Returns
    -------
    SeriesBlock
        The combined timeseries block.

    """
    channel_dict = blocks[0].channels
    channel_set = set(channel_dict)
    start_time_ns = end_time_ns = blocks[0].time_ns
    duration_ns = 0
    for block in blocks:
        assert set(block.data.keys()) == channel_set, (
            "all blocks must contain the same channel sets"
        )
        assert block.time_ns == end_time_ns, (
            f"block start time ({block.time_ns}) does not match "
            f"concatenated block end time ({end_time_ns})"
        )
        duration_ns += block.duration_ns
        end_time_ns += block.duration_ns
    series_dict: dict[str, numpy.ndarray] = {}
    for channel in channel_set:
        series_dict[str(channel)] = numpy.concatenate(
            [block[str(channel)].data for block in blocks]
        )
    return SeriesBlock(start_time_ns, series_dict, channel_dict)

time_as_ns

time_as_ns(time)

Convert a timestamp from seconds to nanoseconds.

Parameters:

Name Type Description Default
time float

The timestamp to convert, in seconds.

required

Returns:

Type Description
int

The converted timestamp, in nanoseconds.

Source code in arrakis/block.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
def time_as_ns(time: float) -> int:
    """Convert a timestamp from seconds to nanoseconds.

    Parameters
    ----------
    time : float
        The timestamp to convert, in seconds.

    Returns
    -------
    int
        The converted timestamp, in nanoseconds.

    """
    seconds = int(time) * Time.s
    nanoseconds = int((time % 1)) * Time.s
    return seconds + nanoseconds