DataSource V2 API Reference¶
The datasource_v2 package provides a composable, dataclass-based API for creating gravitational wave data sources.
Main Entry Points¶
DataSource (Dispatcher)¶
sgnligo.sources.datasource_v2.DataSource
¶
Unified data source that dispatches to the appropriate source class.
This is the main entry point for CLI-based pipelines. It accepts a data_source type string and forwards all other parameters to the appropriate source class.
For programmatic use, you can also instantiate source classes directly (e.g., WhiteSource, GWDataNoiseComposedSource).
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
data_source
|
str
|
Type of source ("white", "gwdata-noise", etc.) |
required |
name
|
str
|
Name for the composed element |
'datasource'
|
**kwargs
|
Any
|
Parameters forwarded to the specific source class |
{}
|
Example
Direct instantiation¶
source = DataSource( ... data_source="white", ... name="test", ... ifos=["H1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... )
From CLI¶
source = DataSource.from_argv(name="data_source")
Use in pipeline¶
pipeline.connect(source.element, sink)
Source code in sgnligo/sources/datasource_v2/datasource.py
| |
element
property
¶
The underlying TSComposedSourceElement for pipeline integration.
srcs
property
¶
Source pads of the composed element.
create_cli_parser(prog=None, description=None)
classmethod
¶
Create CLI argument parser with options for all registered sources.
Use this when you need to add custom arguments to the parser.
Example
parser = DataSource.create_cli_parser() parser.add_argument("--snr-threshold", type=float, default=8.0) source = DataSource.from_parser(parser, name="pipeline")
Returns:
| Type | Description |
|---|---|
ArgumentParser
|
ArgumentParser configured with --data-source and all source options |
Source code in sgnligo/sources/datasource_v2/datasource.py
130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 | |
list_sources()
staticmethod
¶
List all available source types.
Source code in sgnligo/sources/datasource_v2/datasource.py
234 235 236 237 | |
get_source_class(source_type)
staticmethod
¶
Get the source class for a given type.
Source code in sgnligo/sources/datasource_v2/datasource.py
239 240 241 242 | |
CLI Support¶
sgnligo.sources.datasource_v2.cli
¶
CLI support for composed data sources.
This module provides CLI argument parsing and help generation for the dataclass-based composed source classes.
CLI arguments are defined by mixin classes that sources inherit from.
The build_composed_cli_parser() function aggregates CLI arguments
from all registered sources by walking their MRO and collecting
arguments from mixins.
Example
from sgnligo.sources.datasource_v2.cli import ( ... build_composed_cli_parser, ... check_composed_help_options, ... )
if check_composed_help_options(): ... sys.exit(0)
parser = build_composed_cli_parser() args = parser.parse_args()
build_composed_cli_parser(prog=None, description=None)
¶
Build CLI parser by aggregating arguments from source mixins.
This function walks the MRO of all registered source classes and collects CLI arguments from mixins that implement the CLIMixinProtocol. Duplicate arguments (same arg defined by multiple mixins) raise an error.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
prog
|
Optional[str]
|
Program name for help text |
None
|
description
|
Optional[str]
|
Description for help text |
None
|
Returns:
| Type | Description |
|---|---|
ArgumentParser
|
ArgumentParser configured with all source options |
Raises:
| Type | Description |
|---|---|
ValueError
|
If duplicate CLI arguments are detected |
Source code in sgnligo/sources/datasource_v2/cli.py
204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 | |
check_composed_help_options(argv=None)
¶
Check for --list-sources and --help-source before full parsing.
Call this before parse_args() to handle help options that don't require --data-source to be specified.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
argv
|
Optional[List[str]]
|
Command line arguments (defaults to sys.argv[1:]) |
None
|
Returns:
| Type | Description |
|---|---|
bool
|
True if help was handled (caller should exit), False otherwise. |
Source code in sgnligo/sources/datasource_v2/cli.py
167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 | |
namespace_to_datasource_kwargs(args)
¶
Convert argparse namespace to DataSource kwargs.
This function walks the MRO of the selected source class and calls
process_cli_args() on each mixin to convert CLI arguments to field values.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
args
|
Namespace
|
Parsed argparse namespace |
required |
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dict of kwargs for DataSource |
Source code in sgnligo/sources/datasource_v2/cli.py
302 303 304 305 306 307 308 309 310 311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 329 330 331 332 333 334 335 336 337 338 339 340 | |
format_composed_source_help(source_type)
¶
Generate detailed help for a specific source type.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_type
|
str
|
The source type to show help for |
required |
Returns:
| Type | Description |
|---|---|
str
|
Formatted help string |
Source code in sgnligo/sources/datasource_v2/cli.py
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 | |
format_composed_source_list()
¶
Generate list of all available sources grouped by type.
Returns:
| Type | Description |
|---|---|
str
|
Formatted string listing all sources |
Source code in sgnligo/sources/datasource_v2/cli.py
133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 | |
Registry¶
sgnligo.sources.datasource_v2.composed_registry
¶
Registry for dataclass-based composed source classes.
This module provides registration and lookup for the new dataclass-based source classes that inherit from ComposedSourceBase.
Example
from sgnligo.sources.datasource_v2.composed_registry import ( ... register_composed_source, ... get_composed_source_class, ... )
@register_composed_source @dataclass class MySource(ComposedSourceBase): ... source_type: ClassVar[str] = "my-source" ... ...
register_composed_source(cls)
¶
Decorator to register a composed source class.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
cls
|
Type[ComposedSourceBase]
|
The composed source class to register |
required |
Returns:
| Type | Description |
|---|---|
Type[ComposedSourceBase]
|
The same class (unchanged) |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the source_type is empty or already registered |
Example
@register_composed_source @dataclass class WhiteSource(ComposedSourceBase): source_type: ClassVar[str] = "white" ...
Source code in sgnligo/sources/datasource_v2/composed_registry.py
30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 | |
get_composed_source_class(source_type)
¶
Get the composed source class for a given type string.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
source_type
|
str
|
The source type identifier |
required |
Returns:
| Type | Description |
|---|---|
Type[ComposedSourceBase]
|
The composed source class |
Raises:
| Type | Description |
|---|---|
ValueError
|
If the source type is not registered |
Source code in sgnligo/sources/datasource_v2/composed_registry.py
63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 | |
list_composed_source_types()
¶
List all registered composed source types.
Returns:
| Type | Description |
|---|---|
List[str]
|
Sorted list of registered source type names |
Source code in sgnligo/sources/datasource_v2/composed_registry.py
81 82 83 84 85 86 87 | |
get_composed_registry()
¶
Get the full composed source registry.
Returns:
| Type | Description |
|---|---|
Dict[str, Type[ComposedSourceBase]]
|
Dict mapping source type to class |
Source code in sgnligo/sources/datasource_v2/composed_registry.py
90 91 92 93 94 95 96 | |
Source Classes¶
Fake Sources¶
sgnligo.sources.datasource_v2.sources.fake
¶
Fake signal source classes (white, sin, impulse).
These sources generate synthetic test signals for pipeline development and testing without requiring real detector data.
Example
source = WhiteComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... ) pipeline.connect(source.element, sink)
WhiteComposedSource
dataclass
¶
Bases: FakeSourceBase
flowchart TD
sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource[WhiteComposedSource]
sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
click sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource"
click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Gaussian white noise source.
Generates uncorrelated Gaussian white noise for each IFO channel. Useful for basic pipeline testing where spectral characteristics don't matter.
Example
source = WhiteComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... )
Source code in sgnligo/sources/datasource_v2/sources/fake.py
176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 | |
SinComposedSource
dataclass
¶
Bases: FakeSourceBase
flowchart TD
sgnligo.sources.datasource_v2.sources.fake.SinComposedSource[SinComposedSource]
sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.SinComposedSource
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
click sgnligo.sources.datasource_v2.sources.fake.SinComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.SinComposedSource"
click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Sinusoidal test signal source.
Generates a sinusoidal signal for each IFO channel. Useful for testing frequency-domain processing.
Example
source = SinComposedSource( ... name="sine", ... ifos=["H1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... )
Source code in sgnligo/sources/datasource_v2/sources/fake.py
200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 | |
ImpulseComposedSource
dataclass
¶
Bases: FakeSourceBase, ImpulsePositionOptionsMixin
flowchart TD
sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource[ImpulseComposedSource]
sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin[ImpulsePositionOptionsMixin]
sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource
click sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource"
click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin"
Impulse test signal source.
Generates an impulse signal (single spike) for each IFO channel. Useful for testing impulse response.
Fields inherited from mixins
impulse_position: Sample index for impulse (-1 for random)
Example
source = ImpulseComposedSource( ... name="impulse", ... ifos=["H1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... impulse_position=100, ... )
Source code in sgnligo/sources/datasource_v2/sources/fake.py
223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 | |
WhiteRealtimeComposedSource
dataclass
¶
Bases: RealtimeFakeSourceBase
flowchart TD
sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource[WhiteRealtimeComposedSource]
sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase[RealtimeFakeSourceBase]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
click sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource"
click sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Real-time Gaussian white noise source.
Generates white noise synchronized with wall clock time.
Example
source = WhiteRealtimeComposedSource( ... name="realtime_noise", ... ifos=["H1"], ... sample_rate=4096, ... )
Source code in sgnligo/sources/datasource_v2/sources/fake.py
311 312 313 314 315 316 317 318 319 320 321 322 323 324 325 326 327 328 | |
SinRealtimeComposedSource
dataclass
¶
Bases: RealtimeFakeSourceBase
flowchart TD
sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource[SinRealtimeComposedSource]
sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase[RealtimeFakeSourceBase]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
click sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource"
click sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Real-time sinusoidal test signal source.
Generates sinusoidal signal synchronized with wall clock time.
Example
source = SinRealtimeComposedSource( ... name="realtime_sin", ... ifos=["H1"], ... sample_rate=4096, ... )
Source code in sgnligo/sources/datasource_v2/sources/fake.py
331 332 333 334 335 336 337 338 339 340 341 342 343 344 345 346 347 348 | |
ImpulseRealtimeComposedSource
dataclass
¶
Bases: RealtimeFakeSourceBase, ImpulsePositionOptionsMixin
flowchart TD
sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource[ImpulseRealtimeComposedSource]
sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase[RealtimeFakeSourceBase]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin[ImpulsePositionOptionsMixin]
sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource
click sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource"
click sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin"
Real-time impulse test signal source.
Generates impulse signals synchronized with wall clock time.
Fields inherited from mixins
impulse_position: Sample index for impulse (-1 for random)
Example
source = ImpulseRealtimeComposedSource( ... name="realtime_impulse", ... ifos=["H1"], ... sample_rate=4096, ... impulse_position=100, ... )
Source code in sgnligo/sources/datasource_v2/sources/fake.py
351 352 353 354 355 356 357 358 359 360 361 362 363 364 365 366 367 368 369 370 371 372 373 374 | |
GWData Noise Sources¶
sgnligo.sources.datasource_v2.sources.gwdata_noise
¶
GWData noise composed source classes.
These sources generate colored Gaussian noise with realistic LIGO PSDs, suitable for testing and development without real detector data.
Example
source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... t0=1000, ... end=1010, ... ) pipeline.connect(source.element, sink)
GWDataNoiseComposedSource
dataclass
¶
Bases: ComposedSourceBase, IfosOnlyMixin, GPSOptionsMixin, ChannelPatternOptionsMixin, StateVectorOnDictOnlyMixin, VerboseOptionsMixin
flowchart TD
sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource[GWDataNoiseComposedSource]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin[ChannelPatternOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin[StateVectorOnDictOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
click sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource href "" "sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Colored Gaussian noise source with optional state vector gating.
Generates colored Gaussian noise with LIGO PSD for offline analysis. Supports optional segment-based state vector gating.
Fields inherited from mixins
ifos: List of detector prefixes (from IfosOnlyMixin) t0: GPS start time (from GPSOptionsMixin) end: GPS end time (from GPSOptionsMixin) channel_pattern: Channel naming pattern (from ChannelPatternOptionsMixin) state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin) state_segments_file: State segments file (from StateVectorOnDictOnlyMixin) state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin) verbose: Enable verbose output (from VerboseOptionsMixin)
Example
source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... t0=1000, ... end=1010, ... ) pipeline.connect(source.element, sink)
Source code in sgnligo/sources/datasource_v2/sources/gwdata_noise.py
| |
GWDataNoiseRealtimeComposedSource
dataclass
¶
Bases: ComposedSourceBase, IfosOnlyMixin, GPSOptionsOptionalMixin, ChannelPatternOptionsMixin, StateVectorOnDictOnlyMixin, VerboseOptionsMixin
flowchart TD
sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource[GWDataNoiseRealtimeComposedSource]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin[GPSOptionsOptionalMixin]
sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin[ChannelPatternOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin[StateVectorOnDictOnlyMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
click sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin"
click sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Real-time colored Gaussian noise source.
Generates colored Gaussian noise synchronized with wall clock time. Time range is optional for indefinite operation.
Fields inherited from mixins
ifos: List of detector prefixes (from IfosOnlyMixin) t0: GPS start time (optional, from GPSOptionsOptionalMixin) end: GPS end time (optional, from GPSOptionsOptionalMixin) channel_pattern: Channel naming pattern (from ChannelPatternOptionsMixin) state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin) state_segments_file: State segments file (from StateVectorOnDictOnlyMixin) state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin) verbose: Enable verbose output (from VerboseOptionsMixin)
Example
source = GWDataNoiseRealtimeComposedSource( ... name="realtime_noise", ... ifos=["H1"], ... ) pipeline.connect(source.element, sink)
Source code in sgnligo/sources/datasource_v2/sources/gwdata_noise.py
| |
Frame Sources¶
sgnligo.sources.datasource_v2.sources.frames
¶
Frame file composed source classes.
These sources read gravitational wave data from GWF frame files, the standard format for LIGO/Virgo data.
Example
source = FramesComposedSource( ... name="data", ... ifos=["H1", "L1"], ... frame_cache="/path/to/frames.cache", ... channel_dict={"H1": "GDS-CALIB_STRAIN", "L1": "GDS-CALIB_STRAIN"}, ... t0=1000000000, ... end=1000000100, ... ) pipeline.connect(source.element, sink)
FramesComposedSource
dataclass
¶
Bases: ComposedSourceBase, IfosFromChannelMixin, FrameCacheOptionsMixin, ChannelOptionsMixin, GPSOptionsMixin, SegmentsOptionsMixin, InjectionOptionsMixin, VerboseOptionsMixin
flowchart TD
sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource[FramesComposedSource]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin[IfosFromChannelMixin]
sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin[FrameCacheOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin[InjectionOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
click sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource href "" "sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin"
click sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Frame file source for offline analysis.
Reads strain data from GWF frame files specified in a LAL cache file. Supports optional noiseless injections and segment-based gating.
Fields inherited from mixins
ifos: List of detector prefixes (from IfosFromChannelMixin) frame_cache: Path to LAL cache file (from FrameCacheOptionsMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) t0: GPS start time (from GPSOptionsMixin) end: GPS end time (from GPSOptionsMixin) segments_file: Path to LIGO XML segments file (from SegmentsOptionsMixin) segments_name: Segment name in XML (from SegmentsOptionsMixin) noiseless_inj_frame_cache: Injection frame cache (from InjectionOptionsMixin) noiseless_inj_channel_dict: Injection channels (from InjectionOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)
Example
source = FramesComposedSource( ... name="data", ... ifos=["H1"], ... frame_cache="/path/to/frames.cache", ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... t0=1000000000, ... end=1000000100, ... ) pipeline.connect(source.element, sink)
Source code in sgnligo/sources/datasource_v2/sources/frames.py
| |
DevShm Sources¶
sgnligo.sources.datasource_v2.sources.devshm
¶
Shared memory (devshm) composed source classes.
These sources read low-latency data from shared memory for online gravitational wave analysis.
Example
source = DevShmComposedSource( ... name="low_latency", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"}, ... state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"}, ... state_vector_on_dict={"H1": 3}, ... ) pipeline.connect(source.element, sink)
DevShmComposedSource
dataclass
¶
Bases: ComposedSourceBase, IfosFromChannelMixin, ChannelOptionsMixin, DevShmOptionsMixin, QueueTimeoutOptionsMixin, VerboseOptionsMixin
flowchart TD
sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource[DevShmComposedSource]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin[IfosFromChannelMixin]
sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin[DevShmOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin[QueueTimeoutOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
click sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource href "" "sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin"
click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Shared memory source with state vector gating.
Reads low-latency strain data from shared memory and applies state vector gating to ensure only valid data is processed.
Fields inherited from mixins
ifos: List of detector prefixes (from IfosFromChannelMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) shared_memory_dict: Dict mapping IFO to shm path (from DevShmOptionsMixin) discont_wait_time: Discontinuity wait time (from DevShmOptionsMixin) queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)
Additional required fields
state_channel_dict: Dict mapping IFO to state vector channel name state_vector_on_dict: Dict mapping IFO to bitmask for state vector
Example
source = DevShmComposedSource( ... name="low_latency", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"}, ... state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"}, ... state_vector_on_dict={"H1": 3}, ... ) pipeline.connect(source.element, sink)
Source code in sgnligo/sources/datasource_v2/sources/devshm.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 | |
Arrakis Sources¶
sgnligo.sources.datasource_v2.sources.arrakis
¶
Arrakis composed source classes.
These sources read streaming data from for online gravitational wave analysis.
Example
source = ArrakisComposedSource( ... name="kafka_data", ... ifos=["H1", "L1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN", "L1": "GDS-CALIB_STRAIN"}, ... ) pipeline.connect(source.element, sink)
ArrakisComposedSource
dataclass
¶
Bases: ComposedSourceBase, IfosFromChannelMixin, ChannelOptionsMixin, GPSOptionsOptionalMixin, QueueTimeoutOptionsMixin, StateVectorOptionsMixin, VerboseOptionsMixin
flowchart TD
sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource[ArrakisComposedSource]
sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin[IfosFromChannelMixin]
sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin[GPSOptionsOptionalMixin]
sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin[QueueTimeoutOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin[StateVectorOptionsMixin]
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
click sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource href "" "sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource"
click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
click sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin"
click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin"
click sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin"
click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
Arrakis source for streaming data.
Reads streaming gravitational wave data from topics. Optionally supports state vector gating.
Fields inherited from mixins
ifos: List of detector prefixes (from IfosFromChannelMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) t0: GPS start time (optional, from GPSOptionsOptionalMixin) end: GPS end time (optional, from GPSOptionsOptionalMixin) queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin) state_channel_dict: State channel dict (from StateVectorOptionsMixin) state_vector_on_dict: Bitmask dict (from StateVectorOptionsMixin) state_segments_file: State segments file (from StateVectorOptionsMixin) state_sample_rate: State vector sample rate (from StateVectorOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)
Example
source = ArrakisComposedSource( ... name="kafka_data", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... ) pipeline.connect(source.element, sink)
Source code in sgnligo/sources/datasource_v2/sources/arrakis.py
| |
Base Classes¶
sgnligo.sources.composed_base
¶
Base class for composed source elements.
This module provides an abstract base class for creating composed source elements that combine multiple internal elements into a single source. Subclasses define their parameters as dataclass fields and implement _build() to wire up the internal elements.
Usage with pipelines
Composed sources wrap a TSComposedSourceElement internally. To use them with Pipeline.connect(), access the inner element via the .element property:
pipeline = Pipeline() pipeline.connect(source.element, sink)
Example
from dataclasses import dataclass from sgnligo.sources.composed_base import ComposedSourceBase
@dataclass ... class MySource(ComposedSourceBase): ... source_type = "my-source" ... description = "My custom source" ... ... ifos: list[str] ... sample_rate: int ... t0: float ... end: float ... ... def _build(self): ... compose = TSCompose() ... # ... wire up elements ... return compose.as_source(name=self.name)
source = MySource(name="test", ifos=["H1"], sample_rate=4096, t0=0, end=10) print(source.srcs) # Access source pads
ComposedSourceBase
dataclass
¶
Abstract base class for composed source elements.
Subclasses define their parameters as dataclass fields and implement _build() to create the internal composed element. Composition happens automatically in post_init.
The resulting object behaves like a TSComposedSourceElement - it has .srcs for source pads and can be connected to pipelines via pipeline.connect(source, downstream).
Class Attributes
source_type: String identifier for registry (e.g., "white", "frames"). Leave empty if the source should not be registered. description: Human-readable description for help text and documentation.
Example
from dataclasses import dataclass from typing import ClassVar, List from sgnts.compose import TSCompose, TSComposedSourceElement from sgnts.sources import FakeSeriesSource
@dataclass(kw_only=True) ... class WhiteSource(ComposedSourceBase): ... source_type: ClassVar[str] = "white" ... description: ClassVar[str] = "Gaussian white noise" ... ... ifos: List[str] ... sample_rate: int ... t0: float ... end: float ... ... def build(self) -> TSComposedSourceElement: ... compose = TSCompose() ... for ifo in self.ifos: ... fake = FakeSeriesSource( ... name=f"{self.name}{ifo}", ... source_pad_names=(f"{ifo}:STRAIN",), ... rate=self.sample_rate, ... t0=self.t0, ... end=self.end, ... signal_type="white", ... ) ... compose.insert(fake) ... return compose.as_source(name=self.name)
source = WhiteSource( ... name="noise", ... ifos=["H1", "L1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... ) print(list(source.srcs.keys())) ['H1:STRAIN', 'L1:STRAIN']
Source code in sgnligo/sources/composed_base.py
| |
element
property
¶
The underlying TSComposedSourceElement for pipeline integration.
Use this when passing to Pipeline.connect() or other SGN operations that require a proper element type.
Returns:
| Type | Description |
|---|---|
TSComposedSourceElement
|
The inner composed element |
Example
pipeline = Pipeline() pipeline.connect(source.element, sink)
srcs
property
¶
Source pads of the composed element.
Returns:
| Type | Description |
|---|---|
Dict[str, Any]
|
Dictionary mapping pad names to source pad objects |
__getattr__(name)
¶
Delegate unknown attributes to the inner composed element.
This allows composed sources to be used anywhere a TSComposedSourceElement is expected, supporting any additional methods or properties.
Source code in sgnligo/sources/composed_base.py
268 269 270 271 272 273 274 275 276 277 278 279 | |
__post_init__()
¶
Validate parameters and build the composed element.
Source code in sgnligo/sources/composed_base.py
125 126 127 128 129 | |
add_cli_arguments(parser)
classmethod
¶
Add CLI arguments for latency tracking.
Source code in sgnligo/sources/composed_base.py
216 217 218 219 220 221 222 223 224 225 | |
get_cli_arg_names()
classmethod
¶
Return set of CLI argument names defined by this class.
Source code in sgnligo/sources/composed_base.py
227 228 229 230 | |
process_cli_args(args)
classmethod
¶
Convert CLI args to field values.
Source code in sgnligo/sources/composed_base.py
232 233 234 235 236 237 238 239 | |
Utilities¶
sgnligo.sources.datasource_v2.sources.utils
¶
Utility functions for composed sources.
This module contains reusable building blocks that are shared across multiple composed source classes.
add_state_vector_gating(compose, strain_source, state_source, ifo, bit_mask, strain_pad, state_pad, output_pad)
¶
Add BitMask + Gate to a compose for state vector gating.
This is the common pattern used by devshm, arrakis, and gwdata-noise sources. It applies a bitmask to the state vector channel, then uses a Gate to control the strain data based on the masked state vector.
The pattern is
strain_source[strain_pad] ─────────────────┐ ├─> Gate[output_pad] state_source[state_pad] -> BitMask[state] ─┘
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
compose
|
TSCompose
|
TSCompose to add elements to (modified in-place) |
required |
strain_source
|
Source element providing strain data |
required | |
state_source
|
Source element providing state vector data |
required | |
ifo
|
str
|
Interferometer prefix (e.g., "H1") |
required |
bit_mask
|
int
|
Bitmask to apply to state vector |
required |
strain_pad
|
str
|
Name of the strain output pad on strain_source |
required |
state_pad
|
str
|
Name of the state vector output pad on state_source |
required |
output_pad
|
str
|
Name for the gated output pad |
required |
Returns:
| Type | Description |
|---|---|
Gate
|
The Gate element for downstream use (e.g., latency tracking) |
Source code in sgnligo/sources/datasource_v2/sources/utils.py
15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 | |