Skip to content

DataSource V2 API Reference

The datasource_v2 package provides a composable, dataclass-based API for creating gravitational wave data sources.

Main Entry Points

DataSource (Dispatcher)

sgnligo.sources.datasource_v2.DataSource

Unified data source that dispatches to the appropriate source class.

This is the main entry point for CLI-based pipelines. It accepts a data_source type string and forwards all other parameters to the appropriate source class.

For programmatic use, you can also instantiate source classes directly (e.g., WhiteSource, GWDataNoiseComposedSource).

Parameters:

Name Type Description Default
data_source str

Type of source ("white", "gwdata-noise", etc.)

required
name str

Name for the composed element

'datasource'
**kwargs Any

Parameters forwarded to the specific source class

{}
Example

Direct instantiation

source = DataSource( ... data_source="white", ... name="test", ... ifos=["H1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... )

From CLI

source = DataSource.from_argv(name="data_source")

Use in pipeline

pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/datasource.py
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
class DataSource:
    """Unified data source that dispatches to the appropriate source class.

    This is the main entry point for CLI-based pipelines. It accepts a
    data_source type string and forwards all other parameters to the
    appropriate source class.

    For programmatic use, you can also instantiate source classes directly
    (e.g., WhiteSource, GWDataNoiseComposedSource).

    Args:
        data_source: Type of source ("white", "gwdata-noise", etc.)
        name: Name for the composed element
        **kwargs: Parameters forwarded to the specific source class

    Example:
        >>> # Direct instantiation
        >>> source = DataSource(
        ...     data_source="white",
        ...     name="test",
        ...     ifos=["H1"],
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
        >>>
        >>> # From CLI
        >>> source = DataSource.from_argv(name="data_source")
        >>>
        >>> # Use in pipeline
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "datasource"
    description: ClassVar[str] = "Unified data source dispatcher"

    def __init__(
        self,
        data_source: str,
        name: str = "datasource",
        **kwargs: Any,
    ) -> None:
        """Initialize DataSource.

        Args:
            data_source: Type of source ("white", "gwdata-noise", etc.)
            name: Name for the composed element
            **kwargs: Parameters forwarded to the specific source class
        """
        self.data_source = data_source
        self.name = name
        self._kwargs = kwargs

        # Look up and instantiate the inner source
        cls = get_composed_source_class(data_source)
        self._inner: ComposedSourceBase = cls(name=name, **kwargs)

    # --- Expose inner element for pipeline connection ---

    @property
    def element(self) -> TSComposedSourceElement:
        """The underlying TSComposedSourceElement for pipeline integration."""
        return self._inner.element

    @property
    def srcs(self) -> Dict[str, Any]:
        """Source pads of the composed element."""
        return self._inner.srcs

    def __getattr__(self, name: str) -> Any:
        """Delegate unknown attributes to the inner source."""
        if name.startswith("_"):
            raise AttributeError(name)
        return getattr(self._inner, name)

    # --- CLI Support ---

    @classmethod
    def create_cli_parser(
        cls,
        prog: Optional[str] = None,
        description: Optional[str] = None,
    ) -> argparse.ArgumentParser:
        """Create CLI argument parser with options for all registered sources.

        Use this when you need to add custom arguments to the parser.

        Example:
            >>> parser = DataSource.create_cli_parser()
            >>> parser.add_argument("--snr-threshold", type=float, default=8.0)
            >>> source = DataSource.from_parser(parser, name="pipeline")

        Returns:
            ArgumentParser configured with --data-source and all source options
        """
        from sgnligo.sources.datasource_v2.cli import build_composed_cli_parser

        return build_composed_cli_parser(prog=prog, description=description)

    @classmethod
    def from_argv(
        cls,
        name: str = "datasource",
        argv: Optional[List[str]] = None,
    ) -> "DataSource":
        """Create DataSource from command line arguments.

        Simple interface for when you don't need custom CLI arguments.
        Parses sys.argv (or provided argv) directly.

        Example:
            >>> # In a script called with:
            >>> # python script.py --data-source white --ifos H1
            >>> source = DataSource.from_argv(name="my_source")

        Args:
            name: Name for the composed element
            argv: Command line arguments (defaults to sys.argv[1:])

        Returns:
            DataSource instance
        """
        import sys

        from sgnligo.sources.datasource_v2.cli import (
            build_composed_cli_parser,
            check_composed_help_options,
            namespace_to_datasource_kwargs,
        )

        # Handle --list-sources and --help-source before parsing
        if check_composed_help_options(argv):
            sys.exit(0)

        parser = build_composed_cli_parser()
        args = parser.parse_args(argv)
        kwargs = namespace_to_datasource_kwargs(args)
        return cls(name=name, **kwargs)

    @classmethod
    def from_parser(
        cls,
        parser: argparse.ArgumentParser,
        name: str = "datasource",
        argv: Optional[List[str]] = None,
    ) -> tuple["DataSource", argparse.Namespace]:
        """Create DataSource from a custom argument parser.

        Use this when you've added custom arguments to the parser.
        Returns both the DataSource and the parsed args so you can
        access your custom arguments.

        Example:
            >>> parser = DataSource.create_cli_parser()
            >>> parser.add_argument("--snr-threshold", type=float, default=8.0)
            >>> source, args = DataSource.from_parser(parser, name="pipeline")
            >>> print(f"SNR threshold: {args.snr_threshold}")

        Args:
            parser: ArgumentParser (from create_cli_parser with optional additions)
            name: Name for the composed element
            argv: Command line arguments (defaults to sys.argv[1:])

        Returns:
            Tuple of (DataSource instance, parsed args namespace)
        """
        import sys

        from sgnligo.sources.datasource_v2.cli import (
            check_composed_help_options,
            namespace_to_datasource_kwargs,
        )

        # Handle --list-sources and --help-source before parsing
        if check_composed_help_options(argv):
            sys.exit(0)

        args = parser.parse_args(argv)
        kwargs = namespace_to_datasource_kwargs(args)
        return cls(name=name, **kwargs), args

    @staticmethod
    def list_sources() -> List[str]:
        """List all available source types."""
        return list_composed_source_types()

    @staticmethod
    def get_source_class(source_type: str) -> Type[ComposedSourceBase]:
        """Get the source class for a given type."""
        return get_composed_source_class(source_type)

element property

The underlying TSComposedSourceElement for pipeline integration.

srcs property

Source pads of the composed element.

create_cli_parser(prog=None, description=None) classmethod

Create CLI argument parser with options for all registered sources.

Use this when you need to add custom arguments to the parser.

Example

parser = DataSource.create_cli_parser() parser.add_argument("--snr-threshold", type=float, default=8.0) source = DataSource.from_parser(parser, name="pipeline")

Returns:

Type Description
ArgumentParser

ArgumentParser configured with --data-source and all source options

Source code in sgnligo/sources/datasource_v2/datasource.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
@classmethod
def create_cli_parser(
    cls,
    prog: Optional[str] = None,
    description: Optional[str] = None,
) -> argparse.ArgumentParser:
    """Create CLI argument parser with options for all registered sources.

    Use this when you need to add custom arguments to the parser.

    Example:
        >>> parser = DataSource.create_cli_parser()
        >>> parser.add_argument("--snr-threshold", type=float, default=8.0)
        >>> source = DataSource.from_parser(parser, name="pipeline")

    Returns:
        ArgumentParser configured with --data-source and all source options
    """
    from sgnligo.sources.datasource_v2.cli import build_composed_cli_parser

    return build_composed_cli_parser(prog=prog, description=description)

list_sources() staticmethod

List all available source types.

Source code in sgnligo/sources/datasource_v2/datasource.py
234
235
236
237
@staticmethod
def list_sources() -> List[str]:
    """List all available source types."""
    return list_composed_source_types()

get_source_class(source_type) staticmethod

Get the source class for a given type.

Source code in sgnligo/sources/datasource_v2/datasource.py
239
240
241
242
@staticmethod
def get_source_class(source_type: str) -> Type[ComposedSourceBase]:
    """Get the source class for a given type."""
    return get_composed_source_class(source_type)

CLI Support

sgnligo.sources.datasource_v2.cli

CLI support for composed data sources.

This module provides CLI argument parsing and help generation for the dataclass-based composed source classes.

CLI arguments are defined by mixin classes that sources inherit from. The build_composed_cli_parser() function aggregates CLI arguments from all registered sources by walking their MRO and collecting arguments from mixins.

Example

from sgnligo.sources.datasource_v2.cli import ( ... build_composed_cli_parser, ... check_composed_help_options, ... )

if check_composed_help_options(): ... sys.exit(0)

parser = build_composed_cli_parser() args = parser.parse_args()

build_composed_cli_parser(prog=None, description=None)

Build CLI parser by aggregating arguments from source mixins.

This function walks the MRO of all registered source classes and collects CLI arguments from mixins that implement the CLIMixinProtocol. Duplicate arguments (same arg defined by multiple mixins) raise an error.

Parameters:

Name Type Description Default
prog Optional[str]

Program name for help text

None
description Optional[str]

Description for help text

None

Returns:

Type Description
ArgumentParser

ArgumentParser configured with all source options

Raises:

Type Description
ValueError

If duplicate CLI arguments are detected

Source code in sgnligo/sources/datasource_v2/cli.py
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
def build_composed_cli_parser(
    prog: Optional[str] = None,
    description: Optional[str] = None,
) -> argparse.ArgumentParser:
    """Build CLI parser by aggregating arguments from source mixins.

    This function walks the MRO of all registered source classes and
    collects CLI arguments from mixins that implement the CLIMixinProtocol.
    Duplicate arguments (same arg defined by multiple mixins) raise an error.

    Args:
        prog: Program name for help text
        description: Description for help text

    Returns:
        ArgumentParser configured with all source options

    Raises:
        ValueError: If duplicate CLI arguments are detected
    """
    parser = argparse.ArgumentParser(
        prog=prog,
        description=description or "Process gravitational wave data",
    )

    # Main dispatch option
    source_types = list_composed_source_types()
    parser.add_argument(
        "--data-source",
        required=True,
        choices=source_types,
        help="Type of data source to use",
    )

    # Help options (always available)
    parser.add_argument(
        "--list-sources",
        action="store_true",
        help="List available source types",
    )
    parser.add_argument(
        "--help-source",
        metavar="SOURCE",
        help="Show help for a specific source type",
    )

    # First pass: collect all CLI mixins from all registered sources
    # We need to process mixins with MORE args first (supersets before subsets)
    # to ensure variant mixins like StateVectorOptionsMixin (4 args) are processed
    # before StateVectorOnDictOnlyMixin (3 args), so all args get registered.
    cli_mixins: List[Type] = []
    seen_mixins: Set[Type] = set()

    for _source_type, cls in _COMPOSED_REGISTRY.items():
        for base in cls.__mro__:
            if base in seen_mixins:
                continue

            # Skip if not a CLI mixin (doesn't define add_cli_arguments directly)
            if "add_cli_arguments" not in base.__dict__:
                continue
            if "get_cli_arg_names" not in base.__dict__:
                continue  # pragma: no cover

            # Skip the protocol class itself
            if base is CLIMixinProtocol:
                continue  # pragma: no cover

            cli_mixins.append(base)
            seen_mixins.add(base)

    # Sort mixins by arg count descending - supersets first
    cli_mixins.sort(key=lambda m: len(m.get_cli_arg_names()), reverse=True)

    # Second pass: add arguments, skipping mixins with overlapping args
    added_args: Dict[str, Type] = {}
    for mixin in cli_mixins:
        new_args = mixin.get_cli_arg_names()

        # Check if any args from this mixin are already added
        # If so, skip this mixin entirely - it's a variant mixin that shares
        # some args with another (e.g., GPSOptionsMixin vs GPSOptionsOptionalMixin,
        # or StateVectorOptionsMixin vs StateVectorOnDictOnlyMixin).
        # Since we sorted by arg count, the superset was already processed.
        any_args_exist = any(arg in added_args for arg in new_args)
        if any_args_exist:
            continue

        # Register all args from this mixin
        for arg in new_args:
            added_args[arg] = mixin

        # Add the arguments to the parser
        mixin.add_cli_arguments(parser)

    return parser

check_composed_help_options(argv=None)

Check for --list-sources and --help-source before full parsing.

Call this before parse_args() to handle help options that don't require --data-source to be specified.

Parameters:

Name Type Description Default
argv Optional[List[str]]

Command line arguments (defaults to sys.argv[1:])

None

Returns:

Type Description
bool

True if help was handled (caller should exit), False otherwise.

Source code in sgnligo/sources/datasource_v2/cli.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
def check_composed_help_options(argv: Optional[List[str]] = None) -> bool:
    """Check for --list-sources and --help-source before full parsing.

    Call this before parse_args() to handle help options that don't require
    --data-source to be specified.

    Args:
        argv: Command line arguments (defaults to sys.argv[1:])

    Returns:
        True if help was handled (caller should exit), False otherwise.
    """
    if argv is None:
        argv = sys.argv[1:]

    if "--list-sources" in argv:
        print(format_composed_source_list())
        return True

    if "--help-source" in argv:
        try:
            idx = argv.index("--help-source")
            source_type = argv[idx + 1]
            if source_type in _COMPOSED_REGISTRY:
                print(format_composed_source_help(source_type))
                return True
            else:
                available = ", ".join(sorted(_COMPOSED_REGISTRY.keys()))
                print(f"Unknown source type '{source_type}'. Available: {available}")
                return True
        except IndexError:
            print("--help-source requires a source type argument")
            return True

    return False

namespace_to_datasource_kwargs(args)

Convert argparse namespace to DataSource kwargs.

This function walks the MRO of the selected source class and calls process_cli_args() on each mixin to convert CLI arguments to field values.

Parameters:

Name Type Description Default
args Namespace

Parsed argparse namespace

required

Returns:

Type Description
Dict[str, Any]

Dict of kwargs for DataSource

Source code in sgnligo/sources/datasource_v2/cli.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
def namespace_to_datasource_kwargs(args: argparse.Namespace) -> Dict[str, Any]:
    """Convert argparse namespace to DataSource kwargs.

    This function walks the MRO of the selected source class and calls
    `process_cli_args()` on each mixin to convert CLI arguments to field values.

    Args:
        args: Parsed argparse namespace

    Returns:
        Dict of kwargs for DataSource
    """
    kwargs: Dict[str, Any] = {
        "data_source": args.data_source,
    }

    # Get the source class to walk its MRO for process_cli_args
    source_type = args.data_source
    if source_type in _COMPOSED_REGISTRY:
        cls = _COMPOSED_REGISTRY[source_type]
        processed_classes: Set[Type] = set()

        # Walk MRO and call process_cli_args on each mixin
        for base in cls.__mro__:
            if base in processed_classes:
                continue  # pragma: no cover

            if not hasattr(base, "process_cli_args"):
                continue

            if base is CLIMixinProtocol:
                continue  # pragma: no cover

            mixin_kwargs = base.process_cli_args(args)
            kwargs.update(mixin_kwargs)

            processed_classes.add(base)

    return kwargs

format_composed_source_help(source_type)

Generate detailed help for a specific source type.

Parameters:

Name Type Description Default
source_type str

The source type to show help for

required

Returns:

Type Description
str

Formatted help string

Source code in sgnligo/sources/datasource_v2/cli.py
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def format_composed_source_help(source_type: str) -> str:
    """Generate detailed help for a specific source type.

    Args:
        source_type: The source type to show help for

    Returns:
        Formatted help string
    """
    cls = get_composed_source_class(source_type)

    lines = [
        f"usage: prog --data-source {source_type} [options]",
        "",
        f"{source_type}: {cls.description}",
        "",
    ]

    # Required fields
    required = get_source_required_fields(cls)
    if required:
        lines.append("Required Options:")
        for name in required:
            cli_name = name.replace("_", "-")
            lines.append(f"  --{cli_name}")
        lines.append("")

    # Optional fields
    optional = get_source_optional_fields(cls)
    if optional:
        lines.append("Optional Options:")
        for name, default in optional.items():
            cli_name = name.replace("_", "-")
            if default is False:
                lines.append(f"  --{cli_name}")
            elif default is not None:
                lines.append(f"  --{cli_name} (default: {default})")
            else:
                lines.append(f"  --{cli_name}")
        lines.append("")

    # Add docstring notes if available
    if cls.__doc__:
        # Extract just the description part (first paragraph)
        doc_lines = cls.__doc__.strip().split("\n\n")[0].split("\n")
        if doc_lines:
            lines.append("Description:")
            for doc_line in doc_lines:
                lines.append(f"  {doc_line.strip()}")

    return "\n".join(lines)

format_composed_source_list()

Generate list of all available sources grouped by type.

Returns:

Type Description
str

Formatted string listing all sources

Source code in sgnligo/sources/datasource_v2/cli.py
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
def format_composed_source_list() -> str:
    """Generate list of all available sources grouped by type.

    Returns:
        Formatted string listing all sources
    """
    lines = ["Available data sources:", ""]

    offline = []
    realtime = []

    for source_type in sorted(_COMPOSED_REGISTRY.keys()):
        cls = _COMPOSED_REGISTRY[source_type]
        desc = f"  {source_type:30} {cls.description}"

        if "realtime" in source_type.lower():
            realtime.append(desc)
        else:
            offline.append(desc)

    if offline:
        lines.append("Offline Sources:")
        lines.extend(offline)
        lines.append("")

    if realtime:
        lines.append("Real-time Sources:")
        lines.extend(realtime)
        lines.append("")

    lines.append("Use --help-source <name> for detailed options.")
    return "\n".join(lines)

Registry

sgnligo.sources.datasource_v2.composed_registry

Registry for dataclass-based composed source classes.

This module provides registration and lookup for the new dataclass-based source classes that inherit from ComposedSourceBase.

Example

from sgnligo.sources.datasource_v2.composed_registry import ( ... register_composed_source, ... get_composed_source_class, ... )

@register_composed_source @dataclass class MySource(ComposedSourceBase): ... source_type: ClassVar[str] = "my-source" ... ...

register_composed_source(cls)

Decorator to register a composed source class.

Parameters:

Name Type Description Default
cls Type[ComposedSourceBase]

The composed source class to register

required

Returns:

Type Description
Type[ComposedSourceBase]

The same class (unchanged)

Raises:

Type Description
ValueError

If the source_type is empty or already registered

Example

@register_composed_source @dataclass class WhiteSource(ComposedSourceBase): source_type: ClassVar[str] = "white" ...

Source code in sgnligo/sources/datasource_v2/composed_registry.py
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
def register_composed_source(
    cls: Type[ComposedSourceBase],
) -> Type[ComposedSourceBase]:
    """Decorator to register a composed source class.

    Args:
        cls: The composed source class to register

    Returns:
        The same class (unchanged)

    Raises:
        ValueError: If the source_type is empty or already registered

    Example:
        @register_composed_source
        @dataclass
        class WhiteSource(ComposedSourceBase):
            source_type: ClassVar[str] = "white"
            ...
    """
    source_type = cls.source_type
    if not source_type:
        raise ValueError(f"Class {cls.__name__} must define source_type")
    if source_type in _COMPOSED_REGISTRY:
        raise ValueError(
            f"Source type '{source_type}' is already registered "
            f"(by {_COMPOSED_REGISTRY[source_type].__name__})"
        )
    _COMPOSED_REGISTRY[source_type] = cls
    return cls

get_composed_source_class(source_type)

Get the composed source class for a given type string.

Parameters:

Name Type Description Default
source_type str

The source type identifier

required

Returns:

Type Description
Type[ComposedSourceBase]

The composed source class

Raises:

Type Description
ValueError

If the source type is not registered

Source code in sgnligo/sources/datasource_v2/composed_registry.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
def get_composed_source_class(source_type: str) -> Type[ComposedSourceBase]:
    """Get the composed source class for a given type string.

    Args:
        source_type: The source type identifier

    Returns:
        The composed source class

    Raises:
        ValueError: If the source type is not registered
    """
    if source_type not in _COMPOSED_REGISTRY:
        available = ", ".join(sorted(_COMPOSED_REGISTRY.keys()))
        raise ValueError(f"Unknown source type '{source_type}'. Available: {available}")
    return _COMPOSED_REGISTRY[source_type]

list_composed_source_types()

List all registered composed source types.

Returns:

Type Description
List[str]

Sorted list of registered source type names

Source code in sgnligo/sources/datasource_v2/composed_registry.py
81
82
83
84
85
86
87
def list_composed_source_types() -> List[str]:
    """List all registered composed source types.

    Returns:
        Sorted list of registered source type names
    """
    return sorted(_COMPOSED_REGISTRY.keys())

get_composed_registry()

Get the full composed source registry.

Returns:

Type Description
Dict[str, Type[ComposedSourceBase]]

Dict mapping source type to class

Source code in sgnligo/sources/datasource_v2/composed_registry.py
90
91
92
93
94
95
96
def get_composed_registry() -> Dict[str, Type[ComposedSourceBase]]:
    """Get the full composed source registry.

    Returns:
        Dict mapping source type to class
    """
    return _COMPOSED_REGISTRY.copy()

Source Classes

Fake Sources

sgnligo.sources.datasource_v2.sources.fake

Fake signal source classes (white, sin, impulse).

These sources generate synthetic test signals for pipeline development and testing without requiring real detector data.

Example

source = WhiteComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... ) pipeline.connect(source.element, sink)

WhiteComposedSource dataclass

Bases: FakeSourceBase


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource[WhiteComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                



              click sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.WhiteComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Gaussian white noise source.

Generates uncorrelated Gaussian white noise for each IFO channel. Useful for basic pipeline testing where spectral characteristics don't matter.

Example

source = WhiteComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
@register_composed_source
@dataclass(kw_only=True)
class WhiteComposedSource(FakeSourceBase):
    """Gaussian white noise source.

    Generates uncorrelated Gaussian white noise for each IFO channel.
    Useful for basic pipeline testing where spectral characteristics
    don't matter.

    Example:
        >>> source = WhiteComposedSource(
        ...     name="noise",
        ...     ifos=["H1", "L1"],
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
    """

    source_type: ClassVar[str] = "white"
    description: ClassVar[str] = "Gaussian white noise"
    signal_type: ClassVar[str] = "white"

SinComposedSource dataclass

Bases: FakeSourceBase


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.SinComposedSource[SinComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.SinComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                



              click sgnligo.sources.datasource_v2.sources.fake.SinComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.SinComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Sinusoidal test signal source.

Generates a sinusoidal signal for each IFO channel. Useful for testing frequency-domain processing.

Example

source = SinComposedSource( ... name="sine", ... ifos=["H1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
@register_composed_source
@dataclass(kw_only=True)
class SinComposedSource(FakeSourceBase):
    """Sinusoidal test signal source.

    Generates a sinusoidal signal for each IFO channel.
    Useful for testing frequency-domain processing.

    Example:
        >>> source = SinComposedSource(
        ...     name="sine",
        ...     ifos=["H1"],
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
    """

    source_type: ClassVar[str] = "sin"
    description: ClassVar[str] = "Sinusoidal test signal"
    signal_type: ClassVar[str] = "sin"

ImpulseComposedSource dataclass

Bases: FakeSourceBase, ImpulsePositionOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource[ImpulseComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase[FakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin[ImpulsePositionOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase
                

                sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource
                


              click sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.ImpulseComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.FakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin"
            

Impulse test signal source.

Generates an impulse signal (single spike) for each IFO channel. Useful for testing impulse response.

Fields inherited from mixins

impulse_position: Sample index for impulse (-1 for random)

Example

source = ImpulseComposedSource( ... name="impulse", ... ifos=["H1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... impulse_position=100, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
@register_composed_source
@dataclass(kw_only=True)
class ImpulseComposedSource(FakeSourceBase, ImpulsePositionOptionsMixin):
    """Impulse test signal source.

    Generates an impulse signal (single spike) for each IFO channel.
    Useful for testing impulse response.

    Fields inherited from mixins:
        impulse_position: Sample index for impulse (-1 for random)

    Example:
        >>> source = ImpulseComposedSource(
        ...     name="impulse",
        ...     ifos=["H1"],
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ...     impulse_position=100,
        ... )
    """

    source_type: ClassVar[str] = "impulse"
    description: ClassVar[str] = "Impulse test signal"
    signal_type: ClassVar[str] = "impulse"

WhiteRealtimeComposedSource dataclass

Bases: RealtimeFakeSourceBase


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource[WhiteRealtimeComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase[RealtimeFakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                



              click sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.WhiteRealtimeComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Real-time Gaussian white noise source.

Generates white noise synchronized with wall clock time.

Example

source = WhiteRealtimeComposedSource( ... name="realtime_noise", ... ifos=["H1"], ... sample_rate=4096, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
@register_composed_source
@dataclass(kw_only=True)
class WhiteRealtimeComposedSource(RealtimeFakeSourceBase):
    """Real-time Gaussian white noise source.

    Generates white noise synchronized with wall clock time.

    Example:
        >>> source = WhiteRealtimeComposedSource(
        ...     name="realtime_noise",
        ...     ifos=["H1"],
        ...     sample_rate=4096,
        ... )
    """

    source_type: ClassVar[str] = "white-realtime"
    description: ClassVar[str] = "Real-time Gaussian white noise"
    signal_type: ClassVar[str] = "white"

SinRealtimeComposedSource dataclass

Bases: RealtimeFakeSourceBase


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource[SinRealtimeComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase[RealtimeFakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                



              click sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.SinRealtimeComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Real-time sinusoidal test signal source.

Generates sinusoidal signal synchronized with wall clock time.

Example

source = SinRealtimeComposedSource( ... name="realtime_sin", ... ifos=["H1"], ... sample_rate=4096, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
@register_composed_source
@dataclass(kw_only=True)
class SinRealtimeComposedSource(RealtimeFakeSourceBase):
    """Real-time sinusoidal test signal source.

    Generates sinusoidal signal synchronized with wall clock time.

    Example:
        >>> source = SinRealtimeComposedSource(
        ...     name="realtime_sin",
        ...     ifos=["H1"],
        ...     sample_rate=4096,
        ... )
    """

    source_type: ClassVar[str] = "sin-realtime"
    description: ClassVar[str] = "Real-time sinusoidal test signal"
    signal_type: ClassVar[str] = "sin"

ImpulseRealtimeComposedSource dataclass

Bases: RealtimeFakeSourceBase, ImpulsePositionOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource[ImpulseRealtimeComposedSource]
              sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase[RealtimeFakeSourceBase]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin[SampleRateOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin[ImpulsePositionOptionsMixin]

                              sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase --> sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource
                                sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase
                

                sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin --> sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource
                


              click sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.fake.ImpulseRealtimeComposedSource"
              click sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase href "" "sgnligo.sources.datasource_v2.sources.fake.RealtimeFakeSourceBase"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SampleRateOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ImpulsePositionOptionsMixin"
            

Real-time impulse test signal source.

Generates impulse signals synchronized with wall clock time.

Fields inherited from mixins

impulse_position: Sample index for impulse (-1 for random)

Example

source = ImpulseRealtimeComposedSource( ... name="realtime_impulse", ... ifos=["H1"], ... sample_rate=4096, ... impulse_position=100, ... )

Source code in sgnligo/sources/datasource_v2/sources/fake.py
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
@register_composed_source
@dataclass(kw_only=True)
class ImpulseRealtimeComposedSource(
    RealtimeFakeSourceBase, ImpulsePositionOptionsMixin
):
    """Real-time impulse test signal source.

    Generates impulse signals synchronized with wall clock time.

    Fields inherited from mixins:
        impulse_position: Sample index for impulse (-1 for random)

    Example:
        >>> source = ImpulseRealtimeComposedSource(
        ...     name="realtime_impulse",
        ...     ifos=["H1"],
        ...     sample_rate=4096,
        ...     impulse_position=100,
        ... )
    """

    source_type: ClassVar[str] = "impulse-realtime"
    description: ClassVar[str] = "Real-time impulse test signal"
    signal_type: ClassVar[str] = "impulse"

GWData Noise Sources

sgnligo.sources.datasource_v2.sources.gwdata_noise

GWData noise composed source classes.

These sources generate colored Gaussian noise with realistic LIGO PSDs, suitable for testing and development without real detector data.

Example

source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... t0=1000, ... end=1010, ... ) pipeline.connect(source.element, sink)

GWDataNoiseComposedSource dataclass

Bases: ComposedSourceBase, IfosOnlyMixin, GPSOptionsMixin, ChannelPatternOptionsMixin, StateVectorOnDictOnlyMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource[GWDataNoiseComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin[ChannelPatternOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin[StateVectorOnDictOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource
                


              click sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource href "" "sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Colored Gaussian noise source with optional state vector gating.

Generates colored Gaussian noise with LIGO PSD for offline analysis. Supports optional segment-based state vector gating.

Fields inherited from mixins

ifos: List of detector prefixes (from IfosOnlyMixin) t0: GPS start time (from GPSOptionsMixin) end: GPS end time (from GPSOptionsMixin) channel_pattern: Channel naming pattern (from ChannelPatternOptionsMixin) state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin) state_segments_file: State segments file (from StateVectorOnDictOnlyMixin) state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Example

source = GWDataNoiseComposedSource( ... name="noise", ... ifos=["H1", "L1"], ... t0=1000, ... end=1010, ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/gwdata_noise.py
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
@register_composed_source
@dataclass(kw_only=True)
class GWDataNoiseComposedSource(
    ComposedSourceBase,
    IfosOnlyMixin,
    GPSOptionsMixin,
    ChannelPatternOptionsMixin,
    StateVectorOnDictOnlyMixin,
    VerboseOptionsMixin,
):
    """Colored Gaussian noise source with optional state vector gating.

    Generates colored Gaussian noise with LIGO PSD for offline analysis.
    Supports optional segment-based state vector gating.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from IfosOnlyMixin)
        t0: GPS start time (from GPSOptionsMixin)
        end: GPS end time (from GPSOptionsMixin)
        channel_pattern: Channel naming pattern (from ChannelPatternOptionsMixin)
        state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin)
        state_segments_file: State segments file (from StateVectorOnDictOnlyMixin)
        state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Example:
        >>> source = GWDataNoiseComposedSource(
        ...     name="noise",
        ...     ifos=["H1", "L1"],
        ...     t0=1000,
        ...     end=1010,
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "gwdata-noise"
    description: ClassVar[str] = "Colored Gaussian noise with LIGO PSD"

    def _validate(self) -> None:
        """Validate parameters."""
        if self.t0 >= self.end:
            raise ValueError("t0 must be less than end")

        # Validate state segments file
        if self.state_segments_file is not None:
            if not os.path.exists(self.state_segments_file):
                raise ValueError(
                    f"State segments file does not exist: {self.state_segments_file}"
                )

        # Validate state_vector_on_dict
        if self.state_vector_on_dict is not None:
            if set(self.state_vector_on_dict.keys()) != set(self.ifos):
                raise ValueError("state_vector_on_dict keys must match ifos")

    def _build_channel_dict(self) -> Dict[str, str]:
        """Build channel dict from pattern."""
        return {ifo: self.channel_pattern.format(ifo=ifo) for ifo in self.ifos}

    def _load_state_segments(
        self,
    ) -> Tuple[Optional[Tuple], Optional[Tuple]]:
        """Load state segments from file or create defaults."""
        if self.state_vector_on_dict is None:
            return None, None

        if self.state_segments_file is not None:
            state_segments, state_values = read_segments_and_values_from_file(
                self.state_segments_file, self.verbose
            )
        else:
            # Default: single segment covering entire time range with value 3
            start_ns = int(self.t0 * 1e9)
            end_ns = int(self.end * 1e9)
            state_segments = ((start_ns, end_ns),)
            state_values = (3,)  # Default: bits 0 and 1 set
            if self.verbose:
                print("Using default state segments: single segment with value 3")

        return state_segments, state_values

    def _build(self) -> TSComposedSourceElement:
        """Build the GWData noise source."""
        channel_dict = self._build_channel_dict()

        # Create the noise source
        noise_source = GWDataNoiseSource(
            name=f"{self.name}_noise",
            channel_dict=channel_dict,
            t0=self.t0,
            end=self.end,
            real_time=False,
            verbose=self.verbose,
        )

        compose = TSCompose()

        # Check if we need state vector gating
        if self.state_vector_on_dict is not None:
            state_segments, state_values = self._load_state_segments()
            assert state_segments is not None
            assert state_values is not None

            for ifo in self.ifos:
                strain_channel = channel_dict[ifo]

                # Create segment source for state vector
                state_source = SegmentSource(
                    name=f"{self.name}_{ifo}_state",
                    source_pad_names=("state",),
                    rate=self.state_sample_rate,
                    t0=self.t0,
                    end=self.end,
                    segments=state_segments,
                    values=state_values,
                )

                gate = add_state_vector_gating(
                    compose=compose,
                    strain_source=noise_source,
                    state_source=state_source,
                    ifo=ifo,
                    bit_mask=self.state_vector_on_dict[ifo],
                    strain_pad=strain_channel,
                    state_pad="state",
                    output_pad=ifo,
                )

                # Add latency tracking if configured
                self._add_latency_tracking(compose, ifo, gate, ifo)

                if self.verbose:
                    print(
                        f"Added state vector gating for {ifo} with mask "
                        f"{self.state_vector_on_dict[ifo]}"
                    )
        else:
            # No gating - just expose noise source directly
            compose.insert(noise_source)

            # Add latency tracking for each IFO
            for ifo in self.ifos:
                strain_channel = channel_dict[ifo]
                self._add_latency_tracking(compose, ifo, noise_source, strain_channel)

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

GWDataNoiseRealtimeComposedSource dataclass

Bases: ComposedSourceBase, IfosOnlyMixin, GPSOptionsOptionalMixin, ChannelPatternOptionsMixin, StateVectorOnDictOnlyMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource[GWDataNoiseRealtimeComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin[IfosOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin[GPSOptionsOptionalMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin[ChannelPatternOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin[StateVectorOnDictOnlyMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource
                


              click sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource href "" "sgnligo.sources.datasource_v2.sources.gwdata_noise.GWDataNoiseRealtimeComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelPatternOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOnDictOnlyMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Real-time colored Gaussian noise source.

Generates colored Gaussian noise synchronized with wall clock time. Time range is optional for indefinite operation.

Fields inherited from mixins

ifos: List of detector prefixes (from IfosOnlyMixin) t0: GPS start time (optional, from GPSOptionsOptionalMixin) end: GPS end time (optional, from GPSOptionsOptionalMixin) channel_pattern: Channel naming pattern (from ChannelPatternOptionsMixin) state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin) state_segments_file: State segments file (from StateVectorOnDictOnlyMixin) state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Example

source = GWDataNoiseRealtimeComposedSource( ... name="realtime_noise", ... ifos=["H1"], ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/gwdata_noise.py
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
@register_composed_source
@dataclass(kw_only=True)
class GWDataNoiseRealtimeComposedSource(
    ComposedSourceBase,
    IfosOnlyMixin,
    GPSOptionsOptionalMixin,
    ChannelPatternOptionsMixin,
    StateVectorOnDictOnlyMixin,
    VerboseOptionsMixin,
):
    """Real-time colored Gaussian noise source.

    Generates colored Gaussian noise synchronized with wall clock time.
    Time range is optional for indefinite operation.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from IfosOnlyMixin)
        t0: GPS start time (optional, from GPSOptionsOptionalMixin)
        end: GPS end time (optional, from GPSOptionsOptionalMixin)
        channel_pattern: Channel naming pattern (from ChannelPatternOptionsMixin)
        state_vector_on_dict: Bitmask dict (from StateVectorOnDictOnlyMixin)
        state_segments_file: State segments file (from StateVectorOnDictOnlyMixin)
        state_sample_rate: State vector sample rate (from StateVectorOnDictOnlyMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Example:
        >>> source = GWDataNoiseRealtimeComposedSource(
        ...     name="realtime_noise",
        ...     ifos=["H1"],
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "gwdata-noise-realtime"
    description: ClassVar[str] = "Real-time colored Gaussian noise with LIGO PSD"

    def _validate(self) -> None:
        """Validate parameters."""
        if self.t0 is not None and self.end is not None and self.t0 >= self.end:
            raise ValueError("t0 must be less than end")

        # Validate state segments file
        if self.state_segments_file is not None:
            if not os.path.exists(self.state_segments_file):
                raise ValueError(
                    f"State segments file does not exist: {self.state_segments_file}"
                )

        # Validate state_vector_on_dict
        if self.state_vector_on_dict is not None:
            if set(self.state_vector_on_dict.keys()) != set(self.ifos):
                raise ValueError("state_vector_on_dict keys must match ifos")

    def _build_channel_dict(self) -> Dict[str, str]:
        """Build channel dict from pattern."""
        return {ifo: self.channel_pattern.format(ifo=ifo) for ifo in self.ifos}

    def _load_state_segments(
        self,
    ) -> Tuple[Optional[Tuple], Optional[Tuple]]:
        """Load state segments from file or create defaults."""
        if self.state_vector_on_dict is None:
            return None, None

        if self.state_segments_file is not None:
            state_segments, state_values = read_segments_and_values_from_file(
                self.state_segments_file, self.verbose
            )
        else:
            # Default: single segment covering entire time range with value 3
            if self.t0 is not None:
                start_ns = int(self.t0 * 1e9)
                if self.end is not None:
                    end_ns = int(self.end * 1e9)
                else:
                    # For real-time mode without end time
                    end_ns = int(np.iinfo(np.int32).max * 1e9)
                state_segments = ((start_ns, end_ns),)
                state_values = (3,)  # Default: bits 0 and 1 set
                if self.verbose:
                    print("Using default state segments: single segment with value 3")
            else:
                raise ValueError(
                    "Must provide either state_segments_file or t0 "
                    "when using state vector gating"
                )

        return state_segments, state_values

    def _build(self) -> TSComposedSourceElement:
        """Build the real-time GWData noise source."""
        channel_dict = self._build_channel_dict()

        # Create the noise source
        noise_source = GWDataNoiseSource(
            name=f"{self.name}_noise",
            channel_dict=channel_dict,
            t0=self.t0,
            end=self.end,
            real_time=True,
            verbose=self.verbose,
        )

        compose = TSCompose()

        # Check if we need state vector gating
        if self.state_vector_on_dict is not None:
            state_segments, state_values = self._load_state_segments()
            assert state_segments is not None
            assert state_values is not None

            # Determine end time for SegmentSource (doesn't support None)
            seg_end = (
                self.end if self.end is not None else float(np.iinfo(np.int32).max)
            )

            for ifo in self.ifos:
                strain_channel = channel_dict[ifo]

                # Create segment source for state vector
                state_source = SegmentSource(
                    name=f"{self.name}_{ifo}_state",
                    source_pad_names=("state",),
                    rate=self.state_sample_rate,
                    t0=self.t0,
                    end=seg_end,
                    segments=state_segments,
                    values=state_values,
                )

                gate = add_state_vector_gating(
                    compose=compose,
                    strain_source=noise_source,
                    state_source=state_source,
                    ifo=ifo,
                    bit_mask=self.state_vector_on_dict[ifo],
                    strain_pad=strain_channel,
                    state_pad="state",
                    output_pad=ifo,
                )

                # Add latency tracking if configured
                self._add_latency_tracking(compose, ifo, gate, ifo)

                if self.verbose:
                    print(
                        f"Added state vector gating for {ifo} with mask "
                        f"{self.state_vector_on_dict[ifo]}"
                    )
        else:
            # No gating - just expose noise source directly
            compose.insert(noise_source)

            # Add latency tracking for each IFO
            for ifo in self.ifos:
                strain_channel = channel_dict[ifo]
                self._add_latency_tracking(compose, ifo, noise_source, strain_channel)

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

Frame Sources

sgnligo.sources.datasource_v2.sources.frames

Frame file composed source classes.

These sources read gravitational wave data from GWF frame files, the standard format for LIGO/Virgo data.

Example

source = FramesComposedSource( ... name="data", ... ifos=["H1", "L1"], ... frame_cache="/path/to/frames.cache", ... channel_dict={"H1": "GDS-CALIB_STRAIN", "L1": "GDS-CALIB_STRAIN"}, ... t0=1000000000, ... end=1000000100, ... ) pipeline.connect(source.element, sink)

FramesComposedSource dataclass

Bases: ComposedSourceBase, IfosFromChannelMixin, FrameCacheOptionsMixin, ChannelOptionsMixin, GPSOptionsMixin, SegmentsOptionsMixin, InjectionOptionsMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource[FramesComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin[IfosFromChannelMixin]
              sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin[FrameCacheOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin[GPSOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin[SegmentsOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin[InjectionOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource
                


              click sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource href "" "sgnligo.sources.datasource_v2.sources.frames.FramesComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.FrameCacheOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.SegmentsOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.InjectionOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Frame file source for offline analysis.

Reads strain data from GWF frame files specified in a LAL cache file. Supports optional noiseless injections and segment-based gating.

Fields inherited from mixins

ifos: List of detector prefixes (from IfosFromChannelMixin) frame_cache: Path to LAL cache file (from FrameCacheOptionsMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) t0: GPS start time (from GPSOptionsMixin) end: GPS end time (from GPSOptionsMixin) segments_file: Path to LIGO XML segments file (from SegmentsOptionsMixin) segments_name: Segment name in XML (from SegmentsOptionsMixin) noiseless_inj_frame_cache: Injection frame cache (from InjectionOptionsMixin) noiseless_inj_channel_dict: Injection channels (from InjectionOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Example

source = FramesComposedSource( ... name="data", ... ifos=["H1"], ... frame_cache="/path/to/frames.cache", ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... t0=1000000000, ... end=1000000100, ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/frames.py
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
@register_composed_source
@dataclass(kw_only=True)
class FramesComposedSource(
    ComposedSourceBase,
    IfosFromChannelMixin,
    FrameCacheOptionsMixin,
    ChannelOptionsMixin,
    GPSOptionsMixin,
    SegmentsOptionsMixin,
    InjectionOptionsMixin,
    VerboseOptionsMixin,
):
    """Frame file source for offline analysis.

    Reads strain data from GWF frame files specified in a LAL cache file.
    Supports optional noiseless injections and segment-based gating.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from IfosFromChannelMixin)
        frame_cache: Path to LAL cache file (from FrameCacheOptionsMixin)
        channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin)
        t0: GPS start time (from GPSOptionsMixin)
        end: GPS end time (from GPSOptionsMixin)
        segments_file: Path to LIGO XML segments file (from SegmentsOptionsMixin)
        segments_name: Segment name in XML (from SegmentsOptionsMixin)
        noiseless_inj_frame_cache: Injection frame cache (from InjectionOptionsMixin)
        noiseless_inj_channel_dict: Injection channels (from InjectionOptionsMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Example:
        >>> source = FramesComposedSource(
        ...     name="data",
        ...     ifos=["H1"],
        ...     frame_cache="/path/to/frames.cache",
        ...     channel_dict={"H1": "GDS-CALIB_STRAIN"},
        ...     t0=1000000000,
        ...     end=1000000100,
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "frames"
    description: ClassVar[str] = "Read from GWF frame files"

    def _validate(self) -> None:
        """Validate parameters."""
        if self.t0 >= self.end:
            raise ValueError("t0 must be less than end")

        # Validate frame cache
        if not os.path.exists(self.frame_cache):
            raise ValueError(f"Frame cache file does not exist: {self.frame_cache}")

        # Validate channel_dict
        if set(self.channel_dict.keys()) != set(self.ifos):
            raise ValueError("channel_dict keys must match ifos")

        # Validate segments options
        if self.segments_file is not None:
            if self.segments_name is None:
                raise ValueError("Must specify segments_name when segments_file is set")
            if not os.path.exists(self.segments_file):
                raise ValueError(f"Segments file does not exist: {self.segments_file}")

        # Validate injection options
        if self.noiseless_inj_frame_cache is not None:
            if not os.path.exists(self.noiseless_inj_frame_cache):
                raise ValueError(
                    f"Injection frame cache does not exist: "
                    f"{self.noiseless_inj_frame_cache}"
                )
            if self.noiseless_inj_channel_dict is None:
                raise ValueError(
                    "Must specify noiseless_inj_channel_dict when "
                    "noiseless_inj_frame_cache is set"
                )

    def _load_segments(self) -> Optional[Dict[str, list]]:
        """Load and process segments from XML file."""
        if self.segments_file is None or self.segments_name is None:
            return None

        loaded_segments = ligolw_segments.segmenttable_get_by_name(
            ligolw_utils.load_filename(
                self.segments_file,
                contenthandler=ligolw_segments.LIGOLWContentHandler,
            ),
            self.segments_name,
        ).coalesce()

        # Clip to requested time range
        seg = segments.segment(LIGOTimeGPS(self.t0), LIGOTimeGPS(self.end))
        clipped_segments = segments.segmentlistdict(
            (ifo, seglist & segments.segmentlist([seg]))
            for ifo, seglist in loaded_segments.items()
        )

        # Convert to nanoseconds
        segments_dict = {}
        for ifo, segs in clipped_segments.items():
            segments_dict[ifo] = [segments.segment(s[0].ns(), s[1].ns()) for s in segs]
        return segments_dict

    def _build(self) -> TSComposedSourceElement:
        """Build the frame file source."""
        compose = TSCompose()
        segments_dict = self._load_segments()

        # Determine sample rate from first frame reader (will be set after creation)
        sample_rate = None

        for ifo in self.ifos:
            channel_name = f"{ifo}:{self.channel_dict[ifo]}"

            # Create main frame reader
            frame_reader = FrameReader(
                name=f"{self.name}_{ifo}_frames",
                framecache=self.frame_cache,
                channel_names=[channel_name],
                instrument=ifo,
                t0=self.t0,
                end=self.end,
            )

            # Get sample rate from first frame reader
            if sample_rate is None:
                sample_rate = next(iter(frame_reader.rates.values()))

            # Track the current output element and pad for this IFO
            current_source = frame_reader
            current_pad = channel_name

            # Add injection if configured
            if self.noiseless_inj_frame_cache and self.noiseless_inj_channel_dict:
                if ifo in self.noiseless_inj_channel_dict:
                    inj_channel = f"{ifo}:{self.noiseless_inj_channel_dict[ifo]}"

                    inj_reader = FrameReader(
                        name=f"{self.name}_{ifo}_inj",
                        framecache=self.noiseless_inj_frame_cache,
                        channel_names=[inj_channel],
                        instrument=ifo,
                        t0=self.t0,
                        end=self.end,
                    )

                    # Add frames together
                    adder = Adder(
                        name=f"{self.name}_{ifo}_add",
                        sink_pad_names=("frame", "inj"),
                        source_pad_names=(ifo,),
                    )

                    compose.connect(
                        frame_reader,
                        adder,
                        link_map={"frame": channel_name},
                    )
                    compose.connect(
                        inj_reader,
                        adder,
                        link_map={"inj": inj_channel},
                    )

                    current_source = adder
                    current_pad = ifo

                    if self.verbose:
                        print(f"Added injection for {ifo} from {inj_channel}")
            else:
                # No injection - just insert the frame reader
                compose.insert(frame_reader)

            # Add segment gating if configured
            if segments_dict is not None and ifo in segments_dict:
                ifo_segments = segments_dict[ifo]

                if ifo_segments:  # Only add gating if there are segments
                    seg_source = SegmentSource(
                        name=f"{self.name}_{ifo}_seg",
                        source_pad_names=("control",),
                        rate=sample_rate,
                        t0=self.t0,
                        end=self.end,
                        segments=ifo_segments,
                    )

                    gate = Gate(
                        name=f"{self.name}_{ifo}_gate",
                        sink_pad_names=("strain", "control"),
                        control="control",
                        source_pad_names=(ifo,),
                    )

                    compose.connect(
                        current_source,
                        gate,
                        link_map={"strain": current_pad},
                    )
                    compose.connect(
                        seg_source,
                        gate,
                        link_map={"control": "control"},
                    )

                    current_source = gate
                    current_pad = ifo

                    if self.verbose:
                        print(f"Added segment gating for {ifo}")

            # Add latency tracking if configured
            self._add_latency_tracking(compose, ifo, current_source, current_pad)

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

DevShm Sources

sgnligo.sources.datasource_v2.sources.devshm

Shared memory (devshm) composed source classes.

These sources read low-latency data from shared memory for online gravitational wave analysis.

Example

source = DevShmComposedSource( ... name="low_latency", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"}, ... state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"}, ... state_vector_on_dict={"H1": 3}, ... ) pipeline.connect(source.element, sink)

DevShmComposedSource dataclass

Bases: ComposedSourceBase, IfosFromChannelMixin, ChannelOptionsMixin, DevShmOptionsMixin, QueueTimeoutOptionsMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource[DevShmComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin[IfosFromChannelMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin[DevShmOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin[QueueTimeoutOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource
                


              click sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource href "" "sgnligo.sources.datasource_v2.sources.devshm.DevShmComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.DevShmOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Shared memory source with state vector gating.

Reads low-latency strain data from shared memory and applies state vector gating to ensure only valid data is processed.

Fields inherited from mixins

ifos: List of detector prefixes (from IfosFromChannelMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) shared_memory_dict: Dict mapping IFO to shm path (from DevShmOptionsMixin) discont_wait_time: Discontinuity wait time (from DevShmOptionsMixin) queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Additional required fields

state_channel_dict: Dict mapping IFO to state vector channel name state_vector_on_dict: Dict mapping IFO to bitmask for state vector

Example

source = DevShmComposedSource( ... name="low_latency", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"}, ... state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"}, ... state_vector_on_dict={"H1": 3}, ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/devshm.py
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
@register_composed_source
@dataclass(kw_only=True)
class DevShmComposedSource(
    ComposedSourceBase,
    IfosFromChannelMixin,
    ChannelOptionsMixin,
    DevShmOptionsMixin,
    QueueTimeoutOptionsMixin,
    VerboseOptionsMixin,
):
    """Shared memory source with state vector gating.

    Reads low-latency strain data from shared memory and applies state
    vector gating to ensure only valid data is processed.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from IfosFromChannelMixin)
        channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin)
        shared_memory_dict: Dict mapping IFO to shm path (from DevShmOptionsMixin)
        discont_wait_time: Discontinuity wait time (from DevShmOptionsMixin)
        queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Additional required fields:
        state_channel_dict: Dict mapping IFO to state vector channel name
        state_vector_on_dict: Dict mapping IFO to bitmask for state vector

    Example:
        >>> source = DevShmComposedSource(
        ...     name="low_latency",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "GDS-CALIB_STRAIN"},
        ...     shared_memory_dict={"H1": "/dev/shm/kafka/H1_O4Replay"},
        ...     state_channel_dict={"H1": "GDS-CALIB_STATE_VECTOR"},
        ...     state_vector_on_dict={"H1": 3},
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Required state vector fields (not in mixin because they're required here)
    state_channel_dict: Dict[str, str]
    state_vector_on_dict: Dict[str, int]

    # Class metadata
    source_type: ClassVar[str] = "devshm"
    description: ClassVar[str] = "Read from shared memory"

    def _validate(self) -> None:
        """Validate parameters."""
        ifos_set = set(self.ifos)

        # Validate channel_dict
        if set(self.channel_dict.keys()) != ifos_set:
            raise ValueError("channel_dict keys must match ifos")

        # Validate shared_memory_dict
        if set(self.shared_memory_dict.keys()) != ifos_set:
            raise ValueError("shared_memory_dict keys must match ifos")

        # Validate state_channel_dict (required for devshm)
        if set(self.state_channel_dict.keys()) != ifos_set:
            raise ValueError("state_channel_dict keys must match ifos")

        # Validate state_vector_on_dict (required for devshm)
        if set(self.state_vector_on_dict.keys()) != ifos_set:
            raise ValueError("state_vector_on_dict keys must match ifos")

    def _build(self) -> TSComposedSourceElement:
        """Build the shared memory source with state vector gating."""
        # Build channel names for DevShmSource
        # DevShmSource expects: {ifo: [strain_channel, state_channel]}
        channel_names = {}
        for ifo in self.ifos:
            strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
            state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"
            channel_names[ifo] = [strain_channel, state_channel]

        # Create the shared memory source
        devshm = DevShmSource(
            name=f"{self.name}_devshm",
            channel_names=channel_names,
            shared_memory_dirs=self.shared_memory_dict,
            discont_wait_time=self.discont_wait_time,
            queue_timeout=self.queue_timeout,
            verbose=self.verbose,
        )

        compose = TSCompose()

        # Add state vector gating for each IFO
        for ifo in self.ifos:
            strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
            state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"

            gate = add_state_vector_gating(
                compose=compose,
                strain_source=devshm,
                state_source=devshm,
                ifo=ifo,
                bit_mask=self.state_vector_on_dict[ifo],
                strain_pad=strain_channel,
                state_pad=state_channel,
                output_pad=ifo,
            )

            # Add latency tracking if configured
            self._add_latency_tracking(compose, ifo, gate, ifo)

            if self.verbose:
                print(
                    f"Added state vector gating for {ifo} with mask "
                    f"{self.state_vector_on_dict[ifo]}"
                )

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

Arrakis Sources

sgnligo.sources.datasource_v2.sources.arrakis

Arrakis composed source classes.

These sources read streaming data from for online gravitational wave analysis.

Example

source = ArrakisComposedSource( ... name="kafka_data", ... ifos=["H1", "L1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN", "L1": "GDS-CALIB_STRAIN"}, ... ) pipeline.connect(source.element, sink)

ArrakisComposedSource dataclass

Bases: ComposedSourceBase, IfosFromChannelMixin, ChannelOptionsMixin, GPSOptionsOptionalMixin, QueueTimeoutOptionsMixin, StateVectorOptionsMixin, VerboseOptionsMixin


              flowchart TD
              sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource[ArrakisComposedSource]
              sgnligo.sources.composed_base.ComposedSourceBase[ComposedSourceBase]
              sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin[IfosFromChannelMixin]
              sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin[ChannelOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin[GPSOptionsOptionalMixin]
              sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin[QueueTimeoutOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin[StateVectorOptionsMixin]
              sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin[VerboseOptionsMixin]

                              sgnligo.sources.composed_base.ComposedSourceBase --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                
                sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin --> sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource
                


              click sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource href "" "sgnligo.sources.datasource_v2.sources.arrakis.ArrakisComposedSource"
              click sgnligo.sources.composed_base.ComposedSourceBase href "" "sgnligo.sources.composed_base.ComposedSourceBase"
              click sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.IfosFromChannelMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.ChannelOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.GPSOptionsOptionalMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.QueueTimeoutOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.StateVectorOptionsMixin"
              click sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin href "" "sgnligo.sources.datasource_v2.cli_mixins.VerboseOptionsMixin"
            

Arrakis source for streaming data.

Reads streaming gravitational wave data from topics. Optionally supports state vector gating.

Fields inherited from mixins

ifos: List of detector prefixes (from IfosFromChannelMixin) channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin) t0: GPS start time (optional, from GPSOptionsOptionalMixin) end: GPS end time (optional, from GPSOptionsOptionalMixin) queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin) state_channel_dict: State channel dict (from StateVectorOptionsMixin) state_vector_on_dict: Bitmask dict (from StateVectorOptionsMixin) state_segments_file: State segments file (from StateVectorOptionsMixin) state_sample_rate: State vector sample rate (from StateVectorOptionsMixin) verbose: Enable verbose output (from VerboseOptionsMixin)

Example

source = ArrakisComposedSource( ... name="kafka_data", ... ifos=["H1"], ... channel_dict={"H1": "GDS-CALIB_STRAIN"}, ... ) pipeline.connect(source.element, sink)

Source code in sgnligo/sources/datasource_v2/sources/arrakis.py
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
@register_composed_source
@dataclass(kw_only=True)
class ArrakisComposedSource(
    ComposedSourceBase,
    IfosFromChannelMixin,
    ChannelOptionsMixin,
    GPSOptionsOptionalMixin,
    QueueTimeoutOptionsMixin,
    StateVectorOptionsMixin,
    VerboseOptionsMixin,
):
    """Arrakis source for streaming data.

    Reads streaming gravitational wave data from topics.
    Optionally supports state vector gating.

    Fields inherited from mixins:
        ifos: List of detector prefixes (from IfosFromChannelMixin)
        channel_dict: Dict mapping IFO to channel name (from ChannelOptionsMixin)
        t0: GPS start time (optional, from GPSOptionsOptionalMixin)
        end: GPS end time (optional, from GPSOptionsOptionalMixin)
        queue_timeout: Queue timeout (from QueueTimeoutOptionsMixin)
        state_channel_dict: State channel dict (from StateVectorOptionsMixin)
        state_vector_on_dict: Bitmask dict (from StateVectorOptionsMixin)
        state_segments_file: State segments file (from StateVectorOptionsMixin)
        state_sample_rate: State vector sample rate (from StateVectorOptionsMixin)
        verbose: Enable verbose output (from VerboseOptionsMixin)

    Example:
        >>> source = ArrakisComposedSource(
        ...     name="kafka_data",
        ...     ifos=["H1"],
        ...     channel_dict={"H1": "GDS-CALIB_STRAIN"},
        ... )
        >>> pipeline.connect(source.element, sink)
    """

    # Class metadata
    source_type: ClassVar[str] = "arrakis"
    description: ClassVar[str] = "Read from Arrakis"

    def _validate(self) -> None:
        """Validate parameters."""
        ifos_set = set(self.ifos)

        # Validate channel_dict
        if set(self.channel_dict.keys()) != ifos_set:
            raise ValueError("channel_dict keys must match ifos")

        # Validate time range if both provided
        if self.t0 is not None and self.end is not None and self.t0 >= self.end:
            raise ValueError("t0 must be less than end")

        # Validate state vector options
        if self.state_channel_dict is not None:
            if set(self.state_channel_dict.keys()) != ifos_set:
                raise ValueError("state_channel_dict keys must match ifos")
            if self.state_vector_on_dict is None:
                raise ValueError(
                    "Must specify state_vector_on_dict when state_channel_dict is set"
                )

        if self.state_vector_on_dict is not None:
            if set(self.state_vector_on_dict.keys()) != ifos_set:
                raise ValueError("state_vector_on_dict keys must match ifos")
            if self.state_channel_dict is None:
                raise ValueError(
                    "Must specify state_channel_dict when state_vector_on_dict is set"
                )

    def _build(self) -> TSComposedSourceElement:
        """Build the Arrakis source."""
        # Check if state vector gating is enabled
        use_state_vector = (
            self.state_channel_dict is not None
            and self.state_vector_on_dict is not None
        )

        # Build channel names list for ArrakisSource
        channel_names = []
        for ifo in self.ifos:
            strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
            channel_names.append(strain_channel)

            if use_state_vector:
                assert self.state_channel_dict is not None  # for type checker
                state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"
                channel_names.append(state_channel)

        # Calculate duration if both times provided
        duration = None
        if self.t0 is not None and self.end is not None:
            duration = self.end - self.t0

        # Create the Arrakis source
        arrakis = ArrakisSource(
            name=f"{self.name}_arrakis",
            source_pad_names=channel_names,
            start_time=self.t0,
            duration=duration,
            in_queue_timeout=int(self.queue_timeout),
        )

        compose = TSCompose()

        if use_state_vector:
            # Add state vector gating for each IFO
            assert self.state_channel_dict is not None  # for type checker
            assert self.state_vector_on_dict is not None  # for type checker
            for ifo in self.ifos:
                strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
                state_channel = f"{ifo}:{self.state_channel_dict[ifo]}"

                gate = add_state_vector_gating(
                    compose=compose,
                    strain_source=arrakis,
                    state_source=arrakis,
                    ifo=ifo,
                    bit_mask=self.state_vector_on_dict[ifo],
                    strain_pad=strain_channel,
                    state_pad=state_channel,
                    output_pad=ifo,
                )

                # Add latency tracking if configured
                self._add_latency_tracking(compose, ifo, gate, ifo)

                if self.verbose:
                    print(
                        f"Added state vector gating for {ifo} with mask "
                        f"{self.state_vector_on_dict[ifo]}"
                    )
        else:
            # No gating - just expose Arrakis source directly
            compose.insert(arrakis)

            # Add latency tracking for each IFO
            for ifo in self.ifos:
                strain_channel = f"{ifo}:{self.channel_dict[ifo]}"
                self._add_latency_tracking(compose, ifo, arrakis, strain_channel)

        return compose.as_source(
            name=self.name,
            also_expose_source_pads=self._also_expose_pads,
        )

Base Classes

sgnligo.sources.composed_base

Base class for composed source elements.

This module provides an abstract base class for creating composed source elements that combine multiple internal elements into a single source. Subclasses define their parameters as dataclass fields and implement _build() to wire up the internal elements.

Usage with pipelines

Composed sources wrap a TSComposedSourceElement internally. To use them with Pipeline.connect(), access the inner element via the .element property:

pipeline = Pipeline() pipeline.connect(source.element, sink)

Example

from dataclasses import dataclass from sgnligo.sources.composed_base import ComposedSourceBase

@dataclass ... class MySource(ComposedSourceBase): ... source_type = "my-source" ... description = "My custom source" ... ... ifos: list[str] ... sample_rate: int ... t0: float ... end: float ... ... def _build(self): ... compose = TSCompose() ... # ... wire up elements ... return compose.as_source(name=self.name)

source = MySource(name="test", ifos=["H1"], sample_rate=4096, t0=0, end=10) print(source.srcs) # Access source pads

ComposedSourceBase dataclass

Abstract base class for composed source elements.

Subclasses define their parameters as dataclass fields and implement _build() to create the internal composed element. Composition happens automatically in post_init.

The resulting object behaves like a TSComposedSourceElement - it has .srcs for source pads and can be connected to pipelines via pipeline.connect(source, downstream).

Class Attributes

source_type: String identifier for registry (e.g., "white", "frames"). Leave empty if the source should not be registered. description: Human-readable description for help text and documentation.

Example

from dataclasses import dataclass from typing import ClassVar, List from sgnts.compose import TSCompose, TSComposedSourceElement from sgnts.sources import FakeSeriesSource

@dataclass(kw_only=True) ... class WhiteSource(ComposedSourceBase): ... source_type: ClassVar[str] = "white" ... description: ClassVar[str] = "Gaussian white noise" ... ... ifos: List[str] ... sample_rate: int ... t0: float ... end: float ... ... def build(self) -> TSComposedSourceElement: ... compose = TSCompose() ... for ifo in self.ifos: ... fake = FakeSeriesSource( ... name=f"{self.name}{ifo}", ... source_pad_names=(f"{ifo}:STRAIN",), ... rate=self.sample_rate, ... t0=self.t0, ... end=self.end, ... signal_type="white", ... ) ... compose.insert(fake) ... return compose.as_source(name=self.name)

source = WhiteSource( ... name="noise", ... ifos=["H1", "L1"], ... sample_rate=4096, ... t0=1000, ... end=1010, ... ) print(list(source.srcs.keys())) ['H1:STRAIN', 'L1:STRAIN']

Source code in sgnligo/sources/composed_base.py
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
@dataclass(kw_only=True)
class ComposedSourceBase:
    """Abstract base class for composed source elements.

    Subclasses define their parameters as dataclass fields and implement
    _build() to create the internal composed element. Composition happens
    automatically in __post_init__.

    The resulting object behaves like a TSComposedSourceElement - it has
    .srcs for source pads and can be connected to pipelines via
    pipeline.connect(source, downstream).

    Class Attributes:
        source_type: String identifier for registry (e.g., "white", "frames").
            Leave empty if the source should not be registered.
        description: Human-readable description for help text and documentation.

    Example:
        >>> from dataclasses import dataclass
        >>> from typing import ClassVar, List
        >>> from sgnts.compose import TSCompose, TSComposedSourceElement
        >>> from sgnts.sources import FakeSeriesSource
        >>>
        >>> @dataclass(kw_only=True)
        ... class WhiteSource(ComposedSourceBase):
        ...     source_type: ClassVar[str] = "white"
        ...     description: ClassVar[str] = "Gaussian white noise"
        ...
        ...     ifos: List[str]
        ...     sample_rate: int
        ...     t0: float
        ...     end: float
        ...
        ...     def _build(self) -> TSComposedSourceElement:
        ...         compose = TSCompose()
        ...         for ifo in self.ifos:
        ...             fake = FakeSeriesSource(
        ...                 name=f"{self.name}_{ifo}",
        ...                 source_pad_names=(f"{ifo}:STRAIN",),
        ...                 rate=self.sample_rate,
        ...                 t0=self.t0,
        ...                 end=self.end,
        ...                 signal_type="white",
        ...             )
        ...             compose.insert(fake)
        ...         return compose.as_source(name=self.name)
        >>>
        >>> source = WhiteSource(
        ...     name="noise",
        ...     ifos=["H1", "L1"],
        ...     sample_rate=4096,
        ...     t0=1000,
        ...     end=1010,
        ... )
        >>> print(list(source.srcs.keys()))
        ['H1:STRAIN', 'L1:STRAIN']
    """

    # Required for all composed sources
    name: str

    # Optional latency tracking (interval in seconds, None = disabled)
    latency_interval: Optional[float] = None

    # Internal composed element (built in __post_init__)
    _composed: TSComposedSourceElement = field(init=False, repr=False)

    # Internal: pads to expose even when internally linked (for latency multilink)
    _also_expose_pads: list[str] = field(init=False, repr=False, default_factory=list)

    # Class-level metadata for registry and CLI
    # Subclasses should override these
    source_type: ClassVar[str] = ""
    description: ClassVar[str] = ""

    def __post_init__(self) -> None:
        """Validate parameters and build the composed element."""
        self._also_expose_pads = []  # Reset before build
        self._validate()
        self._composed = self._build()

    def _validate(self) -> None:
        """Override to add validation logic. Called before _build().

        Raise ValueError with descriptive message if validation fails.

        Example:
            def _validate(self) -> None:
                if self.t0 >= self.end:
                    raise ValueError("t0 must be less than end")
        """
        pass

    def _add_latency_tracking(
        self,
        compose: TSCompose,
        ifo: str,
        strain_source_element: Any,
        strain_pad_name: str,
    ) -> None:
        """Add latency tracking element for a single IFO.

        Call this in _build() after creating each strain source to add
        latency tracking. The strain source pad is connected directly to
        the Latency element. Since SGN supports multilink (one source pad
        to multiple sinks), the strain pad is also registered to be exposed
        externally via `also_expose_source_pads`.

        The latency output will appear as an additional source pad named
        "{ifo}_latency".

        Args:
            compose: The TSCompose object being built
            ifo: IFO name (e.g., "H1")
            strain_source_element: The source element producing strain data
            strain_pad_name: The pad name on the source element to tap

        Example:
            def _build(self) -> TSComposedSourceElement:
                compose = TSCompose()
                for ifo in self.ifos:
                    source = FakeSeriesSource(...)
                    compose.insert(source)
                    self._add_latency_tracking(compose, ifo, source, ifo)
                return compose.as_source(
                    name=self.name,
                    also_expose_source_pads=self._also_expose_pads,
                )
        """
        if self.latency_interval is None:
            return

        latency = Latency(
            name=f"{self.name}_{ifo}_latency",
            sink_pad_names=("data",),
            source_pad_names=(f"{ifo}_latency",),
            route=f"{ifo}_datasource_latency",
            interval=self.latency_interval,
        )

        # Connect strain source directly to latency element
        compose.connect(
            strain_source_element,
            latency,
            link_map={"data": strain_pad_name},
        )

        # Register the strain pad to be exposed externally (multilink pattern)
        # Format: "element_name:src:pad_name"
        pad_full_name = f"{strain_source_element.name}:src:{strain_pad_name}"
        self._also_expose_pads.append(pad_full_name)

    @abstractmethod
    def _build(self) -> TSComposedSourceElement:
        """Build and return the composed element.

        This method wires up the internal elements using TSCompose
        and returns the result of compose.as_source(name=self.name).

        Returns:
            TSComposedSourceElement with source pads for downstream connection
        """
        ...

    # --- CLI argument support ---

    @classmethod
    def add_cli_arguments(cls, parser: argparse.ArgumentParser) -> None:
        """Add CLI arguments for latency tracking."""
        parser.add_argument(
            "--source-latency-interval",
            type=float,
            metavar="SECONDS",
            default=None,
            help="Enable source latency tracking with specified interval in seconds",
        )

    @classmethod
    def get_cli_arg_names(cls) -> Set[str]:
        """Return set of CLI argument names defined by this class."""
        return {"source_latency_interval"}

    @classmethod
    def process_cli_args(cls, args: argparse.Namespace) -> Dict[str, Any]:
        """Convert CLI args to field values."""
        result: Dict[str, Any] = {}
        source_latency_interval = getattr(args, "source_latency_interval", None)
        if source_latency_interval is not None:
            result["latency_interval"] = source_latency_interval
        return result

    # --- Delegate to inner composed element ---

    @property
    def element(self) -> TSComposedSourceElement:
        """The underlying TSComposedSourceElement for pipeline integration.

        Use this when passing to Pipeline.connect() or other SGN operations
        that require a proper element type.

        Returns:
            The inner composed element

        Example:
            >>> pipeline = Pipeline()
            >>> pipeline.connect(source.element, sink)
        """
        return self._composed

    @property
    def srcs(self) -> Dict[str, Any]:
        """Source pads of the composed element.

        Returns:
            Dictionary mapping pad names to source pad objects
        """
        return self._composed.srcs

    def __getattr__(self, name: str) -> Any:
        """Delegate unknown attributes to the inner composed element.

        This allows composed sources to be used anywhere a TSComposedSourceElement
        is expected, supporting any additional methods or properties.
        """
        # Avoid infinite recursion for private attributes
        if name.startswith("_"):
            raise AttributeError(
                f"'{type(self).__name__}' object has no attribute '{name}'"
            )
        return getattr(self._composed, name)

element property

The underlying TSComposedSourceElement for pipeline integration.

Use this when passing to Pipeline.connect() or other SGN operations that require a proper element type.

Returns:

Type Description
TSComposedSourceElement

The inner composed element

Example

pipeline = Pipeline() pipeline.connect(source.element, sink)

srcs property

Source pads of the composed element.

Returns:

Type Description
Dict[str, Any]

Dictionary mapping pad names to source pad objects

__getattr__(name)

Delegate unknown attributes to the inner composed element.

This allows composed sources to be used anywhere a TSComposedSourceElement is expected, supporting any additional methods or properties.

Source code in sgnligo/sources/composed_base.py
268
269
270
271
272
273
274
275
276
277
278
279
def __getattr__(self, name: str) -> Any:
    """Delegate unknown attributes to the inner composed element.

    This allows composed sources to be used anywhere a TSComposedSourceElement
    is expected, supporting any additional methods or properties.
    """
    # Avoid infinite recursion for private attributes
    if name.startswith("_"):
        raise AttributeError(
            f"'{type(self).__name__}' object has no attribute '{name}'"
        )
    return getattr(self._composed, name)

__post_init__()

Validate parameters and build the composed element.

Source code in sgnligo/sources/composed_base.py
125
126
127
128
129
def __post_init__(self) -> None:
    """Validate parameters and build the composed element."""
    self._also_expose_pads = []  # Reset before build
    self._validate()
    self._composed = self._build()

add_cli_arguments(parser) classmethod

Add CLI arguments for latency tracking.

Source code in sgnligo/sources/composed_base.py
216
217
218
219
220
221
222
223
224
225
@classmethod
def add_cli_arguments(cls, parser: argparse.ArgumentParser) -> None:
    """Add CLI arguments for latency tracking."""
    parser.add_argument(
        "--source-latency-interval",
        type=float,
        metavar="SECONDS",
        default=None,
        help="Enable source latency tracking with specified interval in seconds",
    )

get_cli_arg_names() classmethod

Return set of CLI argument names defined by this class.

Source code in sgnligo/sources/composed_base.py
227
228
229
230
@classmethod
def get_cli_arg_names(cls) -> Set[str]:
    """Return set of CLI argument names defined by this class."""
    return {"source_latency_interval"}

process_cli_args(args) classmethod

Convert CLI args to field values.

Source code in sgnligo/sources/composed_base.py
232
233
234
235
236
237
238
239
@classmethod
def process_cli_args(cls, args: argparse.Namespace) -> Dict[str, Any]:
    """Convert CLI args to field values."""
    result: Dict[str, Any] = {}
    source_latency_interval = getattr(args, "source_latency_interval", None)
    if source_latency_interval is not None:
        result["latency_interval"] = source_latency_interval
    return result

Utilities

sgnligo.sources.datasource_v2.sources.utils

Utility functions for composed sources.

This module contains reusable building blocks that are shared across multiple composed source classes.

add_state_vector_gating(compose, strain_source, state_source, ifo, bit_mask, strain_pad, state_pad, output_pad)

Add BitMask + Gate to a compose for state vector gating.

This is the common pattern used by devshm, arrakis, and gwdata-noise sources. It applies a bitmask to the state vector channel, then uses a Gate to control the strain data based on the masked state vector.

The pattern is

strain_source[strain_pad] ─────────────────┐ ├─> Gate[output_pad] state_source[state_pad] -> BitMask[state] ─┘

Parameters:

Name Type Description Default
compose TSCompose

TSCompose to add elements to (modified in-place)

required
strain_source

Source element providing strain data

required
state_source

Source element providing state vector data

required
ifo str

Interferometer prefix (e.g., "H1")

required
bit_mask int

Bitmask to apply to state vector

required
strain_pad str

Name of the strain output pad on strain_source

required
state_pad str

Name of the state vector output pad on state_source

required
output_pad str

Name for the gated output pad

required

Returns:

Type Description
Gate

The Gate element for downstream use (e.g., latency tracking)

Source code in sgnligo/sources/datasource_v2/sources/utils.py
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
def add_state_vector_gating(
    compose: TSCompose,
    strain_source,
    state_source,
    ifo: str,
    bit_mask: int,
    strain_pad: str,
    state_pad: str,
    output_pad: str,
) -> Gate:
    """Add BitMask + Gate to a compose for state vector gating.

    This is the common pattern used by devshm, arrakis, and gwdata-noise sources.
    It applies a bitmask to the state vector channel, then uses a Gate to
    control the strain data based on the masked state vector.

    The pattern is:
        strain_source[strain_pad] ─────────────────┐
                                                   ├─> Gate[output_pad]
        state_source[state_pad] -> BitMask[state] ─┘

    Args:
        compose: TSCompose to add elements to (modified in-place)
        strain_source: Source element providing strain data
        state_source: Source element providing state vector data
        ifo: Interferometer prefix (e.g., "H1")
        bit_mask: Bitmask to apply to state vector
        strain_pad: Name of the strain output pad on strain_source
        state_pad: Name of the state vector output pad on state_source
        output_pad: Name for the gated output pad

    Returns:
        The Gate element for downstream use (e.g., latency tracking)
    """
    # Create BitMask to filter state vector
    mask = BitMask(
        name=f"{ifo}_Mask",
        sink_pad_names=("state",),
        source_pad_names=("state",),
        bit_mask=bit_mask,
    )

    # Create Gate to control strain based on masked state vector
    gate = Gate(
        name=f"{ifo}_Gate",
        sink_pad_names=("strain", "state_vector"),
        control="state_vector",
        source_pad_names=(output_pad,),
    )

    # Connect state_source -> BitMask
    compose.connect(
        state_source,
        mask,
        link_map={"state": state_pad},
    )

    # Connect BitMask -> Gate.state_vector
    compose.connect(
        mask,
        gate,
        link_map={"state_vector": "state"},
    )

    # Connect strain_source -> Gate.strain
    compose.connect(
        strain_source,
        gate,
        link_map={"strain": strain_pad},
    )

    return gate