GWpy Integration Tutorial¶
Overview¶
The sgnligo.gwpy module provides seamless integration between SGN streaming pipelines and GWpy, the standard gravitational wave data analysis library. This enables you to:
- Use GWpy's powerful analysis methods within streaming pipelines
- Convert data between SGN's
SeriesBufferand GWpy'sTimeSeries - Produce output ready for further GWpy-based analysis
This tutorial covers:
- Data conversion utilities
- Filtering with GWpyFilter
- Time-frequency analysis with GWpyQTransform and GWpySpectrogram
- Data sources: TimeSeriesSource
- Output collection with TimeSeriesSink
- Streaming plot generation with GWpyPlotSink
- Complete pipeline examples
PSD and Whitening
For power spectral density computation and whitening, use the native SGN-LIGO transforms in sgnligo.transforms and sgnligo.psd which provide optimized streaming implementations.
Installation Requirements¶
The GWpy integration requires GWpy to be installed:
pip install gwpy
All GWpy components are available under sgnligo.gwpy:
from sgnligo.gwpy.converters import (
seriesbuffer_to_timeseries,
timeseries_to_seriesbuffer,
tsframe_to_timeseries,
timeseries_to_tsframe,
)
from sgnligo.gwpy.transforms import (
GWpyFilter,
GWpyQTransform,
GWpySpectrogram,
)
from sgnligo.gwpy.sources import TimeSeriesSource
from sgnligo.gwpy.sinks import TimeSeriesSink, GWpyPlotSink
Data Conversion Utilities¶
The converters module provides bidirectional conversion between SGN data containers and GWpy objects.
SeriesBuffer to TimeSeries¶
Convert a single buffer to a GWpy TimeSeries:
import numpy as np
from sgnts.base import Offset, SeriesBuffer
from sgnligo.gwpy.converters import seriesbuffer_to_timeseries
# Create a buffer with sample data
data = np.random.randn(4096)
buf = SeriesBuffer(
offset=Offset.fromsec(1126259462), # GW150914 GPS time
sample_rate=4096,
data=data,
shape=data.shape,
)
# Convert to GWpy TimeSeries
ts = seriesbuffer_to_timeseries(buf, channel="H1:STRAIN", unit="strain")
print(f"TimeSeries t0: {ts.t0}")
print(f"TimeSeries duration: {ts.duration}")
print(f"TimeSeries sample rate: {ts.sample_rate}")
Output:
TimeSeries t0: 1126259462.0 s
TimeSeries duration: 1.0 s
TimeSeries sample rate: 4096.0 Hz
TimeSeries to SeriesBuffer¶
Convert a GWpy TimeSeries back to a SeriesBuffer:
import numpy as np
from gwpy.timeseries import TimeSeries
from sgnligo.gwpy.converters import timeseries_to_seriesbuffer
from sgnts.base import Offset
# Create a TimeSeries
ts = TimeSeries(
np.random.randn(4096),
t0=1126259462,
sample_rate=4096,
channel="H1:STRAIN",
)
# Convert to SeriesBuffer
buf = timeseries_to_seriesbuffer(ts)
print(f"Buffer offset (GPS seconds): {Offset.tosec(buf.offset)}")
print(f"Buffer sample rate: {buf.sample_rate}")
print(f"Buffer shape: {buf.shape}")
TSFrame Conversion¶
For frames containing multiple buffers:
import numpy as np
from sgnts.base import TSFrame, SeriesBuffer, Offset
from sgnligo.gwpy.converters import tsframe_to_timeseries, timeseries_to_tsframe
# Create a TSFrame with multiple buffers
buf1 = SeriesBuffer(
offset=Offset.fromsec(1126259462),
sample_rate=4096,
data=np.random.randn(4096),
shape=(4096,),
)
buf2 = SeriesBuffer(
offset=Offset.fromsec(1126259463),
sample_rate=4096,
data=np.random.randn(4096),
shape=(4096,),
)
frame = TSFrame(buffers=[buf1, buf2])
# Convert TSFrame to single concatenated TimeSeries
ts = tsframe_to_timeseries(frame, channel="H1:PROCESSED")
print(f"Concatenated TimeSeries duration: {ts.duration}")
# Convert TimeSeries to TSFrame (creates single-buffer frame)
new_frame = timeseries_to_tsframe(ts)
print(f"New frame has {len(new_frame.buffers)} buffer(s)")
Gap Handling¶
Gap buffers (missing data) are converted to NaN values, preserving timing information:
import numpy as np
from sgnts.base import SeriesBuffer, Offset
from sgnligo.gwpy.converters import seriesbuffer_to_timeseries
# Gap buffer (data=None)
gap_buf = SeriesBuffer(
offset=Offset.fromsec(1126259462),
sample_rate=4096,
data=None,
shape=(4096,),
)
ts = seriesbuffer_to_timeseries(gap_buf)
print(f"All NaN: {np.all(np.isnan(ts.value))}") # True
GWpyFilter: Streaming Filtering¶
Apply bandpass, lowpass, highpass, or notch filters using GWpy's filtering methods.
Bandpass Filter¶
```python skip import matplotlib matplotlib.use("Agg") # Use non-interactive backend import matplotlib.pyplot as plt from sgn.apps import Pipeline from sgnligo.gwpy.transforms import GWpyFilter from sgnts.sources import FakeSeriesSource from sgnts.sinks import TSPlotSink
Create pipeline¶
pipeline = Pipeline()
Generate a 100 Hz sine wave¶
source = FakeSeriesSource( name="Source", source_pad_names=("signal",), signals={ "signal": {"signal_type": "sin", "fsin": 100, "rate": 4096}, }, end=4, # 4 seconds of data ) pipeline.insert(source)
Add bandpass filter (50-200 Hz) - passes the 100 Hz signal¶
bandpass = GWpyFilter( name="Bandpass", sink_pad_names=("in",), source_pad_names=("out",), filter_type="bandpass", low_freq=50, high_freq=200, ) pipeline.insert( bandpass, link_map={"Bandpass:snk:in": "Source:src:signal"} )
TSPlotSink with two pads: original and filtered¶
sink = TSPlotSink( name="Comparison", sink_pad_names=("original", "filtered"), ) pipeline.insert( sink, link_map={ "Comparison:snk:original": "Source:src:signal", "Comparison:snk:filtered": "Bandpass:src:out", } )
Run pipeline¶
pipeline.run()
Plot both signals overlaid (default) or as subplots¶
fig, ax = sink.plot(time_unit="s", layout="overlay") ax.set_title("Bandpass Filter: 100 Hz sine through 50-200 Hz filter") ax.legend() plt.savefig("bandpass_filter_example.png", dpi=150) plt.show()
Or use subplots layout for clearer comparison¶
fig, axes = sink.plot(time_unit="s", layout="subplots") plt.savefig("bandpass_filter_subplots.png", dpi=150) plt.show()
### Notch Filter (Remove Power Line)
```python
from sgnligo.gwpy.transforms import GWpyFilter
notch = GWpyFilter(
name="Notch60Hz",
sink_pad_names=("in",),
source_pad_names=("out",),
filter_type="notch",
notch_freq=60,
notch_q=30,
)
Highpass Filter¶
from sgnligo.gwpy.transforms import GWpyFilter
highpass = GWpyFilter(
name="Highpass",
sink_pad_names=("in",),
source_pad_names=("out",),
filter_type="highpass",
low_freq=20,
)
Lowpass Filter¶
from sgnligo.gwpy.transforms import GWpyFilter
lowpass = GWpyFilter(
name="Lowpass",
sink_pad_names=("in",),
source_pad_names=("out",),
filter_type="lowpass",
high_freq=500,
)
GWpyQTransform: Time-Frequency Q-Transform¶
The Q-transform provides a time-frequency representation with constant-Q frequency resolution, ideal for visualizing transients like gravitational wave signals.
Basic Q-Transform¶
from sgnligo.gwpy.transforms import GWpyQTransform
qtrans = GWpyQTransform(
name="QTransform",
sink_pad_names=("in",),
source_pad_names=("out",),
qrange=(4, 64), # Range of Q values
frange=(20, 500), # Frequency range in Hz
output_rate=64, # Output sample rate (power-of-2)
output_stride=1.0, # Output every 1 second
input_sample_rate=4096, # Expected input rate
)
Power-of-2 Output Rates
Output sample rates must be from Offset.ALLOWED_RATES: {2, 4, 8, 16, 32, 64, 128, 256, 512, 1024, 2048, 4096, 8192, 16384}. This ensures proper offset alignment in the streaming framework.
Q-Transform Parameters¶
| Parameter | Description | Default |
|---|---|---|
qrange |
(min, max) Q values | (4, 64) |
frange |
(min, max) frequency in Hz | (20, 1024) |
mismatch |
Maximum tile mismatch | 0.2 |
logf |
Logarithmic frequency spacing | True |
tres |
Time resolution (auto if None) | None |
fres |
Frequency resolution (auto if None) | None |
output_rate |
Output sample rate (power-of-2) | 64 |
output_stride |
Output duration per cycle (s) | 1.0 |
Q-Transform Output¶
The Q-transform produces 2D output (frequency × time):
# Output buffer shape: (n_frequencies, n_times)
# Metadata available:
# - metadata["qtransform"]: Full GWpy Spectrogram object
# - metadata["q_frequencies"]: Frequency array
# - metadata["q_times"]: Time array
# - metadata["q_qrange"]: Q range used
# - metadata["q_frange"]: Frequency range used
Complete Q-Transform Example¶
from sgn.apps import Pipeline
from sgnligo.gwpy.transforms import GWpyQTransform
from sgnligo.gwpy.sources import TimeSeriesSource
from sgnligo.gwpy.sinks import TimeSeriesSink
from gwpy.timeseries import TimeSeries
import numpy as np
# Create test signal with a chirp
sample_rate = 4096
duration = 4
t = np.arange(0, duration, 1/sample_rate)
# Chirp from 50 to 200 Hz
chirp = np.sin(2 * np.pi * (50 * t + 75 * t**2 / duration))
noise = 0.1 * np.random.randn(len(t))
signal = chirp + noise
ts = TimeSeries(signal, t0=1000000000, sample_rate=sample_rate, channel="CHIRP")
pipeline = Pipeline()
source = TimeSeriesSource(name="Source", timeseries=ts, buffer_duration=1.0)
pipeline.insert(source)
qtrans = GWpyQTransform(
name="QTrans",
sink_pad_names=("in",),
source_pad_names=("out",),
qrange=(4, 64),
frange=(20, 300),
output_rate=64,
output_stride=1.0,
input_sample_rate=sample_rate,
)
pipeline.insert(qtrans, link_map={"QTrans:snk:in": "Source:src:CHIRP"})
# For 2D data, use TimeSeriesSink or custom sink
sink = TimeSeriesSink(name="Sink", sink_pad_names=("in",), channel="QTRANS")
pipeline.insert(sink, link_map={"Sink:snk:in": "QTrans:src:out"})
pipeline.run()
print("Q-transform pipeline complete")
GWpySpectrogram: Time-Frequency Spectrogram¶
Compute standard FFT-based spectrograms for streaming data.
Basic Spectrogram¶
from sgnligo.gwpy.transforms import GWpySpectrogram
spec = GWpySpectrogram(
name="Spectrogram",
sink_pad_names=("in",),
source_pad_names=("out",),
spec_stride=1.0, # Time step between columns
fft_length=2.0, # FFT length in seconds
fft_overlap=1.0, # Overlap between FFTs
window="hann", # Window function
output_rate=64, # Output sample rate (power-of-2)
output_stride=1.0, # Output duration per cycle
input_sample_rate=4096,
)
Spectrogram Parameters¶
| Parameter | Description | Default |
|---|---|---|
spec_stride |
Time step between columns (s) | 1.0 |
fft_length |
FFT length in seconds | 2.0 |
fft_overlap |
Overlap between FFTs (s) | fft_length/2 |
window |
Window function | 'hann' |
nproc |
Parallel processes | 1 |
output_rate |
Output sample rate (power-of-2) | auto |
output_stride |
Output duration per cycle (s) | 1.0 |
Spectrogram Output¶
# Output buffer shape: (n_frequencies, n_times)
# Metadata available:
# - metadata["spectrogram"]: Full GWpy Spectrogram object
# - metadata["spec_frequencies"]: Frequency array
# - metadata["spec_times"]: Time array
# - metadata["spec_df"]: Frequency resolution
# - metadata["spec_dt"]: Time resolution
TimeSeriesSource: GWpy Data to Pipeline¶
Feed an existing GWpy TimeSeries or TimeSeriesDict into a streaming pipeline.
Single TimeSeries¶
import numpy as np
from gwpy.timeseries import TimeSeries
from sgnligo.gwpy.sources import TimeSeriesSource
# Load data with GWpy (example with local data)
ts = TimeSeries(
np.random.randn(4096 * 10),
t0=1126259462,
sample_rate=4096,
channel="H1:STRAIN",
)
# Create source for pipeline
source = TimeSeriesSource(
name="H1_Data",
timeseries=ts,
buffer_duration=1.0, # Output 1-second buffers
)
Multi-Channel TimeSeriesDict¶
import numpy as np
from gwpy.timeseries import TimeSeries, TimeSeriesDict
from sgnligo.gwpy.sources import TimeSeriesSource
# Create multi-channel data
tsd = TimeSeriesDict()
tsd["H1:STRAIN"] = TimeSeries(np.random.randn(4096*10), t0=1000000000, sample_rate=4096)
tsd["L1:STRAIN"] = TimeSeries(np.random.randn(4096*10), t0=1000000000, sample_rate=4096)
# Source automatically creates pads for each channel
source = TimeSeriesSource(
name="Multi_IFO",
timeseries=tsd,
buffer_duration=1.0,
)
# Source pad names: ("H1:STRAIN", "L1:STRAIN")
TimeSeriesSink: Collect Pipeline Output¶
Collect pipeline output into a GWpy TimeSeries for further analysis or plotting.
Basic Usage¶
import numpy as np
from gwpy.timeseries import TimeSeries
from sgn.apps import Pipeline
from sgnligo.gwpy.sources import TimeSeriesSource
from sgnligo.gwpy.sinks import TimeSeriesSink
# Create sample data
ts = TimeSeries(np.random.randn(4096), t0=1000000000, sample_rate=4096, channel="TEST")
# Build pipeline
pipeline = Pipeline()
source = TimeSeriesSource(name="Source", timeseries=ts, buffer_duration=1.0)
pipeline.insert(source)
sink = TimeSeriesSink(
name="Collector",
sink_pad_names=("in",),
channel="H1:PROCESSED",
unit="strain",
collect_all=True, # Concatenate all buffers
)
pipeline.insert(sink, link_map={"Collector:snk:in": "Source:src:TEST"})
# Run and collect output
pipeline.run()
result = sink.get_result() # Single TimeSeries
result_dict = sink.get_result_dict() # TimeSeriesDict
# Check collection status
print(f"Complete: {sink.is_complete}")
print(f"Samples: {sink.samples_collected}")
print(f"Duration: {sink.duration_collected} seconds")
Reusing the Sink¶
from sgnligo.gwpy.sinks import TimeSeriesSink
sink = TimeSeriesSink(name="Sink", sink_pad_names=("in",), channel="DATA")
# After first pipeline run, clear for reuse:
sink.clear()
GWpyPlotSink: Streaming Plot Generation¶
Generate plots at fixed intervals during pipeline execution using the audio adapter pattern.
Basic Usage¶
import numpy as np
from gwpy.timeseries import TimeSeries
from sgn.apps import Pipeline
from sgnligo.gwpy.sources import TimeSeriesSource
from sgnligo.gwpy.sinks import GWpyPlotSink
# Create sample data
ts = TimeSeries(
np.random.randn(4096 * 10),
t0=1000000000,
sample_rate=4096,
channel="H1:STRAIN",
)
# Build pipeline
pipeline = Pipeline()
source = TimeSeriesSource(name="Source", timeseries=ts, buffer_duration=1.0)
pipeline.insert(source)
# Create plot sink - generates one plot per stride
plot_sink = GWpyPlotSink(
name="Plotter",
sink_pad_names=("in",),
ifo="H1",
description="STRAIN",
plot_type="timeseries",
output_dir="./plots",
plot_stride=2.0, # Generate plot every 2 seconds
overlap_before=0.0, # No overlap before
overlap_after=0.0, # No overlap after
input_sample_rate=4096,
)
pipeline.insert(plot_sink, link_map={"Plotter:snk:in": "Source:src:H1:STRAIN"})
pipeline.run()
print(f"Generated {plot_sink.plots_generated} plots")
# Generates: H1-STRAIN-1000000000-2.png, H1-STRAIN-1000000002-2.png, etc.
Plot Types¶
GWpyPlotSink supports three plot types:
Time Series (default):
plot_sink = GWpyPlotSink(
name="Plotter",
sink_pad_names=("in",),
plot_type="timeseries",
...
)
Spectrogram:
plot_sink = GWpyPlotSink(
name="Plotter",
sink_pad_names=("in",),
plot_type="spectrogram",
fft_length=0.5, # FFT length in seconds
...
)
Q-Transform:
plot_sink = GWpyPlotSink(
name="Plotter",
sink_pad_names=("in",),
plot_type="qtransform",
qrange=(4, 64), # Q range
frange=(20, 500), # Frequency range in Hz
...
)
Stride and Overlap¶
Control plot timing using plot_stride, overlap_before, and overlap_after:
plot_stride: Time between consecutive plot startsoverlap_before: Data to include before the stride (left overlap)overlap_after: Data to include after the stride (right overlap)- Total plot duration =
overlap_before+plot_stride+overlap_after
# Generate 4-second plots every 2 seconds
plot_sink = GWpyPlotSink(
name="Plotter",
sink_pad_names=("in",),
plot_stride=2.0, # New plot every 2 seconds
overlap_before=1.0, # Include 1 second before stride
overlap_after=1.0, # Include 1 second after stride
# Total duration = 1 + 2 + 1 = 4 seconds per plot
...
)
# Produces: GPS 999999999 (4s), GPS 1000000001 (4s), GPS 1000000003 (4s), ...
File Naming Convention¶
Files follow LIGO convention: <IFO>-<DESCRIPTION>-<GPS_START>-<DURATION>.png
- Dashes are used only as delimiters
- Any dashes in
descriptionare replaced with underscores - GPS start and duration are integers
Examples:
- H1-STRAIN-1000000000-2.png
- L1-STRAIN_FILTERED-1126259462-4.png
Custom Titles¶
Override the default title using a template:
plot_sink = GWpyPlotSink(
name="Plotter",
sink_pad_names=("in",),
title_template="{ifo} Analysis - GPS {gps_start:.0f} ({duration:.1f}s)",
...
)
# Produces title: "H1 Analysis - GPS 1000000000 (2.0s)"
Available placeholders: {ifo}, {gps_start}, {duration}
GWpyPlotSink Parameters¶
| Parameter | Description | Default |
|---|---|---|
ifo |
Interferometer name (H1, L1, V1) | "H1" |
description |
Description for filename | "STRAIN" |
plot_type |
"timeseries", "spectrogram", or "qtransform" | "timeseries" |
output_dir |
Directory for output files | "." |
plot_stride |
Time between plot starts (seconds) | 1.0 |
overlap_before |
Data before stride (seconds) | 0.0 |
overlap_after |
Data after stride (seconds). Duration = before + stride + after | 0.0 |
input_sample_rate |
Expected input sample rate (Hz) | 4096 |
fft_length |
FFT length for spectrogram (seconds) | 0.5 |
qrange |
Q range for Q-transform | (4, 64) |
frange |
Frequency range for Q-transform (Hz) | (20, 500) |
figsize |
Figure size (width, height) inches | (12, 4) |
dpi |
Resolution for saved plots | 150 |
title_template |
Custom title template | None |
Complete Pipeline Examples¶
Example 1: Bandpass and Q-Transform¶
A complete pipeline that filters data and computes a Q-transform:
from sgn.apps import Pipeline
from sgnligo.gwpy.sources import TimeSeriesSource
from sgnligo.gwpy.transforms import GWpyFilter, GWpyQTransform
from sgnligo.gwpy.sinks import TimeSeriesSink
from gwpy.timeseries import TimeSeries
import numpy as np
# Generate test data with a burst signal
sample_rate = 4096
duration = 8
t = np.arange(0, duration, 1/sample_rate)
# Gaussian-modulated sinusoid (burst)
burst_time = 4.0
sigma = 0.1
burst = np.exp(-((t - burst_time)**2) / (2 * sigma**2)) * np.sin(2 * np.pi * 150 * t)
noise = 0.1 * np.random.randn(len(t))
data = burst + noise
ts = TimeSeries(data, t0=1000000000, sample_rate=sample_rate, channel="TEST")
# Build pipeline
pipeline = Pipeline()
# Source
source = TimeSeriesSource(name="Source", timeseries=ts, buffer_duration=1.0)
pipeline.insert(source)
# Bandpass 30-300 Hz
bandpass = GWpyFilter(
name="Bandpass",
sink_pad_names=("in",),
source_pad_names=("out",),
filter_type="bandpass",
low_freq=30,
high_freq=300,
)
pipeline.insert(bandpass, link_map={"Bandpass:snk:in": "Source:src:TEST"})
# Q-transform
qtrans = GWpyQTransform(
name="QTrans",
sink_pad_names=("in",),
source_pad_names=("out",),
qrange=(4, 64),
frange=(20, 400),
output_rate=64,
input_sample_rate=sample_rate,
)
pipeline.insert(qtrans, link_map={"QTrans:snk:in": "Bandpass:src:out"})
# Collect Q-transform output
sink = TimeSeriesSink(name="Sink", sink_pad_names=("in",), channel="QTRANS")
pipeline.insert(sink, link_map={"Sink:snk:in": "QTrans:src:out"})
pipeline.run()
print("Bandpass + Q-transform pipeline complete!")
Best Practices¶
1. Match Sample Rates¶
Ensure input sample rates match transform expectations:
# Correct: Specify input_sample_rate matching your data
qtrans = GWpyQTransform(
input_sample_rate=4096, # Must match source data rate
...
)
2. Use Power-of-2 Output Rates¶
For Q-transform and spectrogram, use power-of-2 output rates:
# Good: Power-of-2 rate
qtrans = GWpyQTransform(output_rate=64, ...)
# Bad: Arbitrary rate (will raise ValueError)
qtrans = GWpyQTransform(output_rate=100, ...) # Error!
3. Handle Startup Transients¶
Transforms with accumulation (PSD, Whiten) have startup delays:
# PSD needs fft_length of data before producing valid output
# Whiten needs 2*fft_length before producing output
# Plan for this in your analysis
4. Buffer Duration Selection¶
Choose buffer duration based on your use case:
# Smaller buffers: More responsive, higher overhead
source = TimeSeriesSource(buffer_duration=0.1, ...)
# Larger buffers: More efficient, higher latency
source = TimeSeriesSource(buffer_duration=1.0, ...)
5. Check for Gaps¶
When using TimeSeriesSink, gaps appear as NaN:
result = sink.get_result()
if np.any(np.isnan(result.value)):
print("Warning: Result contains gaps")
# Use np.nanmean, np.nanstd, etc. for statistics
Summary¶
| Component | Purpose | Key Parameters |
|---|---|---|
seriesbuffer_to_timeseries |
Convert buffer to GWpy | channel, unit |
timeseries_to_seriesbuffer |
Convert GWpy to buffer | - |
GWpyFilter |
Bandpass/lowpass/highpass/notch | filter_type, frequencies |
GWpyQTransform |
Q-transform | qrange, frange, output_rate |
GWpySpectrogram |
FFT spectrogram | spec_stride, fft_length |
TimeSeriesSource |
GWpy data to pipeline | timeseries, buffer_duration |
TimeSeriesSink |
Collect to GWpy | channel, collect_all |
GWpyPlotSink |
Streaming plot generation | plot_type, plot_stride, output_dir |
PSD and Whitening
For power spectral density computation and whitening, use the native SGN-LIGO transforms in sgnligo.transforms and sgnligo.psd which provide optimized streaming implementations.
For more information:
- GWpy Documentation
- DataSource Tutorial - For using simulated/real detector data
- GWDataNoiseSource Tutorial - For realistic noise simulation