Consumer

Overview

The Consumer API provides utilities for querying and retrieving data from various data backends. It offers full integration with InfluxDB as a timeseries database backend, as well as a mock database for testing purposes.

Basic Usage

Instantiate an InfluxDB Consumer

from ligo.scald.io import influx

# instantiate the consumer
consumer = influx.Consumer(hostname='influx.hostname', port=8086, db='your_database')

Mock Database Consumer

For testing purposes, you can use the mock database consumer:

# Start the mock database server
scald mock

The mock database generates fake data based on HTTP requests and can be used for testing dashboard functionality without requiring a real InfluxDB instance.

Data Retrieval Methods

The Consumer API provides several methods for retrieving different types of data:

Timeseries Data

Retrieve time-ordered data for specific measurements:

# Retrieve timeseries data
time, data = consumer.retrieve_timeseries(
    measurement='sensor_data',
    start=1234567890,
    end=1234567950,
    columns=['amplitude', 'frequency'],
    tags=[('detector', 'H1')],
    aggregate='mean',
    dt=1
)

Latest Values

Get the most recent data points organized by tags:

# Retrieve latest values by tag
time, tag_ids, data = consumer.retrieve_latest_by_tag(
    measurement='status',
    column='state',
    tag_key='detector',
    aggregate='last'
)

Binned Timeseries

Retrieve data binned by both time and tag values:

# Retrieve binned timeseries organized by tag
times, tags, data = consumer.retrieve_binnedtimeseries_by_tag(
    measurement='frequency_spectrum',
    start=1234567890,
    end=1234567950,
    column='power',
    tag_key='frequency_bin',
    aggregate='mean',
    dt=10
)

Snapshot Data

Retrieve structured data for a single timestamp:

# Retrieve snapshot data
time, snapshot, dims = consumer.retrieve_snapshot(measurement='detector_state')

Trigger Data

Query trigger/event data with false alarm rate filtering:

# Retrieve trigger data
triggers = consumer.retrieve_triggers(
    measurement='gravitational_waves',
    start=1234567890,
    end=1234567950,
    columns=['snr', 'chirp_mass'],
    far=1e-6
)

Query Parameters

Common parameters used across retrieval methods:

  • measurement: Name of the measurement/table to query

  • start/end: GPS time range for the query

  • columns: List of data columns to retrieve

  • tags: Tag filters for data selection

  • aggregate: Aggregation method (mean, max, min, last, etc.)

  • dt: Time binning interval in seconds

  • datetime: Return timestamps as datetime objects instead of GPS time

Configuration

Consumers can be configured with various backend options:

# InfluxDB with authentication
consumer = influx.Consumer(
    hostname='secure.influx.host',
    port=8086,
    db='production_data',
    auth=True,
    https=True
)

For HTTPS connections and authentication, ensure proper environment variables are set (see the Monitoring section for details on SSL configuration).

Integration with Dashboard

The Consumer API is automatically used by the dashboard when serving data through the HTTP API endpoints:

  • /api/timeseries/ uses retrieve_timeseries()

  • /api/latest/ uses retrieve_latest_by_tag()

  • /api/heatmap/ uses retrieve_binnedtimeseries_by_tag()

  • /api/snapshot/ uses retrieve_snapshot()

  • /api/table/ uses retrieve_triggers()