Skip to content

sgn.compose

Composable elements for SGN.

This module provides functionality for composing multiple elements into a single element that behaves as one atomic unit. This enables: - Source(s) + optional Transform(s) → ComposedSourceElement - Transform(s) → ComposedTransformElement - optional Transform(s) + Sink(s) → ComposedSinkElement

Define a dataclass that inherits from one of the three composed element base classes, declare parameters as dataclass fields, and override _build() to wire internal elements using self.insert() and self.connect(). The resulting object IS an element -- pass it directly to pipeline.connect().

>>> from dataclasses import dataclass
>>> from sgn.compose import ComposedTransformElement
>>>
>>> @dataclass(repr=False, kw_only=True)
... class ScaleAndOffset(ComposedTransformElement):
...     factor: int = 2
...     offset: int = 10
...
...     def _validate(self):
...         if self.factor <= 0:
...             raise ValueError("factor must be positive")
...
...     def _build(self):
...         scale = Multiply(name=f"{self.name}_scale",
...                          factor=self.factor)
...         add = Add(name=f"{self.name}_add", value=self.offset)
...         self.connect(scale, add)
...
>>> transform = ScaleAndOffset(name="my_transform", factor=3)
>>> pipeline.connect(upstream, transform)

Compose

Fluent builder for creating one-off composed elements.

.. deprecated:: The Compose builder is deprecated and will be removed in a future release. Subclass ComposedSourceElement, ComposedTransformElement, or ComposedSinkElement and override _build() instead -- see their docstrings.

Example (linear chain): >>> composed_source = ( ... Compose() ... .connect(source, transform1) ... .connect(transform1, transform2) ... .as_source(name="my_source") ... )

Example (non-linear graph): >>> composed_source = ( ... Compose() ... .connect(source1, merge_transform) ... .connect(source2, merge_transform) ... .as_source(name="merged_source") ... )

Source code in src/sgn/compose.py
class Compose:
    """Fluent builder for creating one-off composed elements.

    .. deprecated::
        The ``Compose`` builder is deprecated and will be removed in a
        future release. Subclass ``ComposedSourceElement``,
        ``ComposedTransformElement``, or ``ComposedSinkElement`` and
        override ``_build()`` instead -- see their docstrings.

    Example (linear chain):
        >>> composed_source = (
        ...     Compose()
        ...     .connect(source, transform1)
        ...     .connect(transform1, transform2)
        ...     .as_source(name="my_source")
        ... )

    Example (non-linear graph):
        >>> composed_source = (
        ...     Compose()
        ...     .connect(source1, merge_transform)
        ...     .connect(source2, merge_transform)
        ...     .as_source(name="merged_source")
        ... )
    """

    def __init__(self, first_element: Element | None = None) -> None:
        self._graph = Graph()
        if first_element is not None:
            self._graph.insert(first_element)

    def insert(self, *elements: Element, **kwargs) -> "Compose":
        """Insert one or more elements into the composition."""
        self._graph.insert(*elements, **kwargs)
        return self

    def connect(self, source: Element, sink: Element, **kwargs) -> "Compose":
        """Connect two elements, auto-inserting as needed."""
        self._graph.connect(source, sink, **kwargs)
        return self

    def link(self, link_map: dict[str | SinkPad, str | SourcePad]) -> "Compose":
        """Link pads directly by full pad name or pad reference."""
        self._graph.link(link_map)
        return self

    def as_source(
        self,
        name: str = "",
        also_expose_source_pads: list[str] | None = None,
        update_pad_names: dict[str, dict[str, str]] | None = None,
    ) -> ComposedSourceElement:
        """Finalize the composition as a ComposedSourceElement."""
        warnings.warn(_DEPRECATION_MSG, FutureWarning, stacklevel=2)
        return ComposedSourceElement(
            name=name,
            _prebuilt_graph=self._graph,
            also_expose_source_pads=also_expose_source_pads or [],
            update_pad_names=update_pad_names or {},
        )

    def as_transform(
        self,
        name: str = "",
        also_expose_source_pads: list[str] | None = None,
        update_pad_names: dict[str, dict[str, str]] | None = None,
    ) -> ComposedTransformElement:
        """Finalize the composition as a ComposedTransformElement."""
        warnings.warn(_DEPRECATION_MSG, FutureWarning, stacklevel=2)
        return ComposedTransformElement(
            name=name,
            _prebuilt_graph=self._graph,
            also_expose_source_pads=also_expose_source_pads or [],
            update_pad_names=update_pad_names or {},
        )

    def as_sink(
        self,
        name: str = "",
        update_pad_names: dict[str, dict[str, str]] | None = None,
    ) -> ComposedSinkElement:
        """Finalize the composition as a ComposedSinkElement."""
        warnings.warn(_DEPRECATION_MSG, FutureWarning, stacklevel=2)
        return ComposedSinkElement(
            name=name,
            _prebuilt_graph=self._graph,
            update_pad_names=update_pad_names or {},
        )

as_sink(name='', update_pad_names=None)

Finalize the composition as a ComposedSinkElement.

Source code in src/sgn/compose.py
def as_sink(
    self,
    name: str = "",
    update_pad_names: dict[str, dict[str, str]] | None = None,
) -> ComposedSinkElement:
    """Finalize the composition as a ComposedSinkElement."""
    warnings.warn(_DEPRECATION_MSG, FutureWarning, stacklevel=2)
    return ComposedSinkElement(
        name=name,
        _prebuilt_graph=self._graph,
        update_pad_names=update_pad_names or {},
    )

as_source(name='', also_expose_source_pads=None, update_pad_names=None)

Finalize the composition as a ComposedSourceElement.

Source code in src/sgn/compose.py
def as_source(
    self,
    name: str = "",
    also_expose_source_pads: list[str] | None = None,
    update_pad_names: dict[str, dict[str, str]] | None = None,
) -> ComposedSourceElement:
    """Finalize the composition as a ComposedSourceElement."""
    warnings.warn(_DEPRECATION_MSG, FutureWarning, stacklevel=2)
    return ComposedSourceElement(
        name=name,
        _prebuilt_graph=self._graph,
        also_expose_source_pads=also_expose_source_pads or [],
        update_pad_names=update_pad_names or {},
    )

as_transform(name='', also_expose_source_pads=None, update_pad_names=None)

Finalize the composition as a ComposedTransformElement.

Source code in src/sgn/compose.py
def as_transform(
    self,
    name: str = "",
    also_expose_source_pads: list[str] | None = None,
    update_pad_names: dict[str, dict[str, str]] | None = None,
) -> ComposedTransformElement:
    """Finalize the composition as a ComposedTransformElement."""
    warnings.warn(_DEPRECATION_MSG, FutureWarning, stacklevel=2)
    return ComposedTransformElement(
        name=name,
        _prebuilt_graph=self._graph,
        also_expose_source_pads=also_expose_source_pads or [],
        update_pad_names=update_pad_names or {},
    )

connect(source, sink, **kwargs)

Connect two elements, auto-inserting as needed.

Source code in src/sgn/compose.py
def connect(self, source: Element, sink: Element, **kwargs) -> "Compose":
    """Connect two elements, auto-inserting as needed."""
    self._graph.connect(source, sink, **kwargs)
    return self

insert(*elements, **kwargs)

Insert one or more elements into the composition.

Source code in src/sgn/compose.py
def insert(self, *elements: Element, **kwargs) -> "Compose":
    """Insert one or more elements into the composition."""
    self._graph.insert(*elements, **kwargs)
    return self

Link pads directly by full pad name or pad reference.

Source code in src/sgn/compose.py
def link(self, link_map: dict[str | SinkPad, str | SourcePad]) -> "Compose":
    """Link pads directly by full pad name or pad reference."""
    self._graph.link(link_map)
    return self

ComposedElementMixin

Shared composition machinery for the three composed element types.

Responsibilities: - Hold the single internal graph (self._internal_graph) that represents the composition. - Expose builder methods (insert, connect, link) that mutate that graph during _build(). - Provide the _validate and _build template hooks that subclasses override. - Identify the boundary pads (unlinked inputs and outputs) that become the composed element's externally-visible pads.

Composed elements don't execute internally; the Pipeline merges the internal graph into its own and runs the merged graph directly.

Source code in src/sgn/compose.py
class ComposedElementMixin:
    """Shared composition machinery for the three composed element types.

    Responsibilities:
    - Hold the single internal graph (``self._internal_graph``) that represents
      the composition.
    - Expose builder methods (``insert``, ``connect``, ``link``) that mutate
      that graph during ``_build()``.
    - Provide the ``_validate`` and ``_build`` template hooks that subclasses
      override.
    - Identify the boundary pads (unlinked inputs and outputs) that become the
      composed element's externally-visible pads.

    Composed elements don't execute internally; the Pipeline merges the
    internal graph into its own and runs the merged graph directly.
    """

    # --- Annotations populated by the concrete dataclass ---

    _internal_graph: Graph
    _boundary_source_pads: dict[str, SourcePad]
    _boundary_sink_pads: dict[str, list[SinkPad]]
    _virtual_sources: dict[str, SourcePad]
    _in_build: bool

    # --- Template hooks (override in subclasses) ---

    def _validate(self) -> None:  # noqa: B027
        """Optional hook for parameter validation, called before ``_build()``.

        Override in subclasses to check constructor arguments. Raise
        ``ValueError`` to reject invalid parameters. Default is a no-op.
        """

    def _build(self) -> None:  # noqa: B027
        """Optional hook to populate the internal graph.

        Override in subclasses to wire internal elements using
        ``self.insert()``, ``self.connect()``, and ``self.link()``. The
        default is a no-op; the deprecated ``Compose`` builder relies on
        this to inject a pre-built graph without a subclass override.

        Example:
            >>> def _build(self):
            ...     amp = Amplify(name=f"{self.name}_amp", gain=self.gain)
            ...     clip = Clip(name=f"{self.name}_clip", ceiling=self.ceiling)
            ...     self.connect(amp, clip)
        """

    # --- Builder methods (for use during _build()) ---

    def insert(self, *elements: Element, **kwargs) -> "ComposedElementMixin":
        """Insert one or more elements into the composition.

        Only valid during ``_build()``.
        """
        assert self._in_build, "insert() can only be called during _build()"
        self._internal_graph.insert(*elements, **kwargs)
        return self

    def connect(
        self, source: Element, sink: Element, **kwargs
    ) -> "ComposedElementMixin":
        """Connect two elements in the composition, auto-inserting as needed.

        Supports the same implicit linking strategies as ``Pipeline.connect()``
        (exact pad-name match, 1-to-N, explicit ``link_map``). Only valid
        during ``_build()``.
        """
        assert self._in_build, "connect() can only be called during _build()"
        self._internal_graph.connect(source, sink, **kwargs)
        return self

    def link(
        self, link_map: dict[str | SinkPad, str | SourcePad]
    ) -> "ComposedElementMixin":
        """Link pads directly by full pad name or pad reference.

        Lower-level than ``connect()`` -- requires elements to already be
        inserted. Prefer ``connect()`` in most cases. Only valid during
        ``_build()``.
        """
        assert self._in_build, "link() can only be called during _build()"
        self._internal_graph.link(link_map)
        return self

    # --- Derived views ---

    @property
    def internal_elements(self) -> list[Element]:
        """Elements contained in this composition, in insertion order."""
        return self._internal_graph.elements

    # --- Internal lifecycle helpers (shared by concrete classes) ---

    def _populate_internal_graph(self, prebuilt: Graph | None) -> None:
        """Either adopt a pre-built graph or run the subclass hooks."""
        if prebuilt is not None:
            self._internal_graph = prebuilt
            return
        self._internal_graph = Graph()
        self._validate()
        self._in_build = True
        try:
            self._build()
        finally:
            self._in_build = False

    def _identify_boundary_pads(self) -> None:
        """Find pads that should be exposed externally.

        Each boundary entry produces exactly one externally-visible pad on the
        composed element, named by the short pad name (``pad_name``) of the
        internal pad(s) it binds to. The full pad name of that external pad
        (``composed_name:snk:short_name`` or ``:src:short_name``) is unique;
        the short name may collide with short names on other internal
        elements, and the two sides handle that asymmetrically:

        * **Boundary source pads** (``dict[str, SourcePad]``): source pads on
          internal elements that are NOT consumed by any internal sink. Each
          short name maps to a single internal source pad. A short-name
          collision here is ambiguous (which internal source emits?) and
          warns; use ``update_pad_names`` to disambiguate by renaming.

        * **Boundary sink pads** (``dict[str, list[SinkPad]]``): sink pads on
          internal elements that are NOT fed by any internal source. Each
          short name maps to a *list* of internal sink pads. When multiple
          internal sinks share a short name, the single external sink pad
          fans the injected frame out to all of them — this is a feature,
          not a collision.
        """
        # A source pad is "internally consumed" iff some internal sink is
        # linked to it (i.e. that sink's ``.other`` points at it).
        internally_consumed_sources: set[SourcePad] = set()
        for element in self.internal_elements:
            if isinstance(element, (TransformElement, SinkElement)):
                for pad in element.snks.values():
                    if pad.other is not None:
                        internally_consumed_sources.add(pad.other)

        self._boundary_source_pads = {}
        self._boundary_sink_pads = {}

        for element in self.internal_elements:
            if isinstance(element, (SourceElement, TransformElement)):
                for pad_name, pad in element.srcs.items():
                    if pad in internally_consumed_sources:
                        continue
                    if pad_name in self._boundary_source_pads:
                        warnings.warn(
                            f"Boundary source pad name '{pad_name}' collision: "
                            f"{self._boundary_source_pads[pad_name]} overwritten "
                            f"by {pad}. Use 'update_pad_names' to disambiguate.",
                            stacklevel=2,
                        )
                    self._boundary_source_pads[pad_name] = pad

            if isinstance(element, (TransformElement, SinkElement)):
                for pad_name, pad in element.snks.items():
                    if pad.other is None:
                        self._boundary_sink_pads.setdefault(pad_name, []).append(pad)

    def _apply_exposures(self, exposures: list[str]) -> None:
        """Add internally-linked source pads to the boundary outputs."""
        for pad_full_name in exposures:
            pad = self._internal_graph[pad_full_name]
            if not isinstance(pad, SourcePad):
                raise TypeError(
                    f"expose_source_pad expects a source pad full-name, "
                    f"got {type(pad).__name__} at '{pad_full_name}'"
                )
            if pad.pad_name not in self._boundary_source_pads:
                self._boundary_source_pads[pad.pad_name] = pad

    def _apply_source_renames(self, renames: dict[str, dict[str, str]]) -> None:
        """Rename or split a boundary source pad per ``update_pad_names``.

        The old entry is popped and replaced with one new entry per item in
        ``new_pads``. This lets a single boundary source name be split into
        several externally-visible source pads (e.g. to disambiguate an
        ambiguous short-name collision) -- the external pads are distinct and
        each emits from a different internal source pad.
        """
        for old_name, new_pads in renames.items():
            if old_name not in self._boundary_source_pads:
                raise KeyError(f"Pad name {old_name} not found in boundary pads")
            self._boundary_source_pads.pop(old_name)
            for new_name, full_name in new_pads.items():
                pad = self._internal_graph[full_name]
                if not isinstance(pad, SourcePad):
                    raise TypeError(
                        f"expected SourcePad at '{full_name}', "
                        f"got {type(pad).__name__}"
                    )
                self._boundary_source_pads[new_name] = pad

    def _apply_sink_renames(self, renames: dict[str, dict[str, str]]) -> None:
        """Rename, split, or combine boundary sink pads per ``update_pad_names``.

        Each entry pops the old boundary name and appends the listed internal
        sink pads under their new short names. Because boundary sinks map to
        *lists* of internal sinks, both directions are supported:

        * **Split**: one old name, multiple new names -- the external surface
          gains several sink pads, each backed by one internal sink.
        * **Combine**: multiple old names all renamed to the same new name --
          the single external sink pad fans frames out to every listed
          internal sink (they end up appended into the same list).
        """
        for old_name, new_pads in renames.items():
            if old_name not in self._boundary_sink_pads:
                raise KeyError(f"Pad name {old_name} not found in boundary pads")
            self._boundary_sink_pads.pop(old_name)
            for new_name, full_name in new_pads.items():
                pad = self._internal_graph[full_name]
                if not isinstance(pad, SinkPad):
                    raise TypeError(
                        f"expected SinkPad at '{full_name}', "
                        f"got {type(pad).__name__}"
                    )
                self._boundary_sink_pads.setdefault(new_name, []).append(pad)

internal_elements property

Elements contained in this composition, in insertion order.

connect(source, sink, **kwargs)

Connect two elements in the composition, auto-inserting as needed.

Supports the same implicit linking strategies as Pipeline.connect() (exact pad-name match, 1-to-N, explicit link_map). Only valid during _build().

Source code in src/sgn/compose.py
def connect(
    self, source: Element, sink: Element, **kwargs
) -> "ComposedElementMixin":
    """Connect two elements in the composition, auto-inserting as needed.

    Supports the same implicit linking strategies as ``Pipeline.connect()``
    (exact pad-name match, 1-to-N, explicit ``link_map``). Only valid
    during ``_build()``.
    """
    assert self._in_build, "connect() can only be called during _build()"
    self._internal_graph.connect(source, sink, **kwargs)
    return self

insert(*elements, **kwargs)

Insert one or more elements into the composition.

Only valid during _build().

Source code in src/sgn/compose.py
def insert(self, *elements: Element, **kwargs) -> "ComposedElementMixin":
    """Insert one or more elements into the composition.

    Only valid during ``_build()``.
    """
    assert self._in_build, "insert() can only be called during _build()"
    self._internal_graph.insert(*elements, **kwargs)
    return self

Link pads directly by full pad name or pad reference.

Lower-level than connect() -- requires elements to already be inserted. Prefer connect() in most cases. Only valid during _build().

Source code in src/sgn/compose.py
def link(
    self, link_map: dict[str | SinkPad, str | SourcePad]
) -> "ComposedElementMixin":
    """Link pads directly by full pad name or pad reference.

    Lower-level than ``connect()`` -- requires elements to already be
    inserted. Prefer ``connect()`` in most cases. Only valid during
    ``_build()``.
    """
    assert self._in_build, "link() can only be called during _build()"
    self._internal_graph.link(link_map)
    return self

ComposedSinkElement dataclass

Bases: ComposedElementMixin, SinkElement


              flowchart TD
              sgn.compose.ComposedSinkElement[ComposedSinkElement]
              sgn.compose.ComposedElementMixin[ComposedElementMixin]
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.compose.ComposedElementMixin --> sgn.compose.ComposedSinkElement
                
                sgn.base.SinkElement --> sgn.compose.ComposedSinkElement
                                sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.compose.ComposedSinkElement href "" "sgn.compose.ComposedSinkElement"
              click sgn.compose.ComposedElementMixin href "" "sgn.compose.ComposedElementMixin"
              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A composed element that behaves like a SinkElement.

Created from: One or more SinkElements, optionally with TransformElements. Exposes: Sink pads from the boundary (unlinked sink pads become inputs).

Example:

>>> @dataclass(kw_only=True)
... class AccumulateAndWrite(ComposedSinkElement):
...     output_path: str = "/tmp/out"
...
...     def _build(self):
...         acc = Accumulate(name=f"{self.name}_acc")
...         writer = Writer(name=f"{self.name}_wr",
...                         path=self.output_path)
...         self.connect(acc, writer)
...
>>> sink = AccumulateAndWrite(name="writer",
...                          output_path="/data/out.hdf5")
>>> pipeline.connect(upstream, sink)
Source code in src/sgn/compose.py
@dataclass(kw_only=True)
class ComposedSinkElement(ComposedElementMixin, SinkElement):
    """A composed element that behaves like a SinkElement.

    Created from: One or more SinkElements, optionally with TransformElements.
    Exposes: Sink pads from the boundary (unlinked sink pads become inputs).

    Example:

        >>> @dataclass(kw_only=True)
        ... class AccumulateAndWrite(ComposedSinkElement):
        ...     output_path: str = "/tmp/out"
        ...
        ...     def _build(self):
        ...         acc = Accumulate(name=f"{self.name}_acc")
        ...         writer = Writer(name=f"{self.name}_wr",
        ...                         path=self.output_path)
        ...         self.connect(acc, writer)
        ...
        >>> sink = AccumulateAndWrite(name="writer",
        ...                          output_path="/data/out.hdf5")
        >>> pipeline.connect(upstream, sink)
    """

    update_pad_names: dict[str, dict[str, str]] = field(default_factory=dict)
    sink_pad_names: list[str] = field(init=False, default_factory=list)

    _prebuilt_graph: Graph | None = field(default=None, repr=False)

    _internal_graph: Graph = field(init=False)
    _boundary_source_pads: dict[str, SourcePad] = field(
        init=False, default_factory=dict
    )
    _boundary_sink_pads: dict[str, list[SinkPad]] = field(
        init=False, default_factory=dict
    )
    _virtual_sources: dict[str, SourcePad] = field(init=False, default_factory=dict)
    _internal_sinks: list[SinkElement] = field(init=False, default_factory=list)
    _in_build: bool = field(init=False, default=False)

    def __post_init__(self):
        UniqueID.__post_init__(self)

        self._populate_internal_graph(self._prebuilt_graph)

        if not self.internal_elements:
            raise ValueError("ComposedSinkElement requires at least one element")

        self._internal_sinks = [
            e for e in self.internal_elements if isinstance(e, SinkElement)
        ]
        if not self._internal_sinks:
            raise TypeError("ComposedSinkElement requires at least one SinkElement")
        for elem in self.internal_elements:
            if isinstance(elem, SourceElement):
                raise TypeError(
                    f"ComposedSinkElement cannot contain SourceElement, "
                    f"got {elem.name}"
                )

        self._identify_boundary_pads()
        self._apply_sink_renames(self.update_pad_names)

        self.sink_pad_names = list(self._boundary_sink_pads.keys())
        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)
        self.sink_pads = [
            SinkPad(name=pad_name, element=self, call=self.pull)
            for pad_name in self.sink_pad_names
        ]

        for snk_name in self.sink_pad_names:
            self._virtual_sources[snk_name] = SourcePad(
                name=f"_vs_{snk_name}",
                element=self,
                call=lambda pad: pad.output,
            )

        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self._at_eos = {p: False for p in self.sink_pads}

        if not self.sink_pads:  # pragma: no cover
            raise ValueError("ComposedSinkElement must have at least one sink pad")

        self.sink_pad_names_full = [p.name for p in self.sink_pads]

        # Merge the internal graph, then wire in virtual sources and the internal pad
        self.graph.update(self._internal_graph.graph)
        for snk_name, virtual_src in self._virtual_sources.items():
            for internal_snk in self._boundary_sink_pads[snk_name]:
                self.graph.update(internal_snk.link(virtual_src))
            self.graph[virtual_src] = {self.snks[snk_name]}
        self.graph[self.internal_pad] = {
            sink.internal_pad for sink in self._internal_sinks
        }

    @property
    def pad_list(self) -> list[Pad]:
        pads: list[Pad] = [
            *self.sink_pads,
            self.internal_pad,
            *self._virtual_sources.values(),
        ]
        for element in self.internal_elements:
            pads.extend(element.pad_list)
        return pads

    @property
    def at_eos(self) -> bool:
        """Return True when all internal sinks are at EOS."""
        if self._internal_sinks:
            return all(sink.at_eos for sink in self._internal_sinks)
        return any(self._at_eos.values())  # pragma: no cover

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        """Inject frame into virtual source for internal boundary sink."""
        self._virtual_sources[pad.pad_name].output = frame
        if frame.EOS:
            self.mark_eos(pad)

    def internal(self) -> None:
        """No-op - Pipeline handles all execution via merged graph."""

at_eos property

Return True when all internal sinks are at EOS.

internal()

No-op - Pipeline handles all execution via merged graph.

Source code in src/sgn/compose.py
def internal(self) -> None:
    """No-op - Pipeline handles all execution via merged graph."""

pull(pad, frame)

Inject frame into virtual source for internal boundary sink.

Source code in src/sgn/compose.py
def pull(self, pad: SinkPad, frame: Frame) -> None:
    """Inject frame into virtual source for internal boundary sink."""
    self._virtual_sources[pad.pad_name].output = frame
    if frame.EOS:
        self.mark_eos(pad)

ComposedSourceElement dataclass

Bases: ComposedElementMixin, SourceElement


              flowchart TD
              sgn.compose.ComposedSourceElement[ComposedSourceElement]
              sgn.compose.ComposedElementMixin[ComposedElementMixin]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.compose.ComposedElementMixin --> sgn.compose.ComposedSourceElement
                
                sgn.base.SourceElement --> sgn.compose.ComposedSourceElement
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.compose.ComposedSourceElement href "" "sgn.compose.ComposedSourceElement"
              click sgn.compose.ComposedElementMixin href "" "sgn.compose.ComposedElementMixin"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A composed element that behaves like a SourceElement.

Created from: One or more SourceElements, optionally with TransformElements. Exposes: Source pads from the boundary (unlinked source pads become outputs).

Example:

>>> @dataclass(repr=False, kw_only=True)
... class CalibratedSource(ComposedSourceElement):
...     calibration_factor: float = 1.0
...
...     def _build(self):
...         raw = DataProducer(name=f"{self.name}_raw", ...)
...         cal = Multiply(name=f"{self.name}_cal",
...                        factor=self.calibration_factor)
...         self.connect(raw, cal)
...
>>> source = CalibratedSource(name="cal_src",
...                           calibration_factor=2.5)
>>> pipeline.connect(source, downstream)

For multilink patterns, call self.expose_source_pad() in _build() to expose a source pad even when it is internally connected.

Source code in src/sgn/compose.py
@dataclass(repr=False, kw_only=True)
class ComposedSourceElement(ComposedElementMixin, SourceElement):
    """A composed element that behaves like a SourceElement.

    Created from: One or more SourceElements, optionally with TransformElements.
    Exposes: Source pads from the boundary (unlinked source pads become outputs).

    Example:

        >>> @dataclass(repr=False, kw_only=True)
        ... class CalibratedSource(ComposedSourceElement):
        ...     calibration_factor: float = 1.0
        ...
        ...     def _build(self):
        ...         raw = DataProducer(name=f"{self.name}_raw", ...)
        ...         cal = Multiply(name=f"{self.name}_cal",
        ...                        factor=self.calibration_factor)
        ...         self.connect(raw, cal)
        ...
        >>> source = CalibratedSource(name="cal_src",
        ...                           calibration_factor=2.5)
        >>> pipeline.connect(source, downstream)

    For multilink patterns, call ``self.expose_source_pad()`` in
    ``_build()`` to expose a source pad even when it is internally
    connected.
    """

    also_expose_source_pads: list[str] = field(default_factory=list)
    update_pad_names: dict[str, dict[str, str]] = field(default_factory=dict)
    source_pad_names: list[str] = field(init=False, default_factory=list)

    _prebuilt_graph: Graph | None = field(default=None, repr=False)

    _internal_graph: Graph = field(init=False)
    _boundary_source_pads: dict[str, SourcePad] = field(
        init=False, default_factory=dict
    )
    _boundary_sink_pads: dict[str, list[SinkPad]] = field(
        init=False, default_factory=dict
    )
    _virtual_sources: dict[str, SourcePad] = field(init=False, default_factory=dict)
    _in_build: bool = field(init=False, default=False)

    def expose_source_pad(self, pad_full_name: str) -> None:
        """Mark an internal source pad for external exposure.

        Most useful inside ``ComposedTransformElement._build()``, where it
        enables multilink patterns -- the same internal source pad feeds an
        internal sink *and* is also surfaced externally. On
        ``ComposedSourceElement`` the source pads of the wrapped sources are
        already exposed by default unless they're consumed internally; reach
        for ``also_expose_source_pads=`` (the constructor kwarg) instead.

        Only valid during ``_build()``.

        Args:
            pad_full_name: Full pad name in format ``"element_name:src:pad_name"``.
        """
        assert self._in_build, "expose_source_pad() can only be called during _build()"
        self.also_expose_source_pads.append(pad_full_name)

    def __post_init__(self):
        UniqueID.__post_init__(self)

        # Independent copy so expose_source_pad() doesn't mutate caller input.
        self.also_expose_source_pads = list(self.also_expose_source_pads)

        self._populate_internal_graph(self._prebuilt_graph)

        if not self.internal_elements:
            raise ValueError("ComposedSourceElement requires at least one element")
        if not any(isinstance(e, SourceElement) for e in self.internal_elements):
            raise TypeError("ComposedSourceElement requires at least one SourceElement")
        for elem in self.internal_elements:
            if isinstance(elem, SinkElement):
                raise TypeError(
                    f"ComposedSourceElement cannot contain SinkElement, "
                    f"got {elem.name}"
                )

        self._identify_boundary_pads()
        self._apply_exposures(self.also_expose_source_pads)
        self._apply_source_renames(self.update_pad_names)

        # Set up our own externally-visible pads delegating to boundary pads
        self.source_pad_names = list(self._boundary_source_pads.keys())
        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)
        self.source_pads = [
            SourcePad(name=pad_name, element=self, call=self.new)
            for pad_name in self.source_pad_names
        ]
        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}

        if not self.source_pads:  # pragma: no cover
            raise ValueError("ComposedSourceElement must have at least one source pad")

        # Merge internal graph; wire source_pads <- internal_pad <- boundary srcs
        self.graph.update(self._internal_graph.graph)
        self.graph[self.internal_pad] = set(self._boundary_source_pads.values())
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    @property
    def pad_list(self) -> list[Pad]:
        pads: list[Pad] = [*self.source_pads, self.internal_pad]
        for element in self.internal_elements:
            pads.extend(element.pad_list)
        return pads

    def internal(self) -> None:
        """No-op - Pipeline handles all execution via merged graph."""

    def new(self, pad: SourcePad) -> Frame:
        """Return output from internal boundary pad (already executed by Pipeline)."""
        internal_pad = self._boundary_source_pads[pad.pad_name]
        output = internal_pad.output
        assert output is not None, f"Internal pad {internal_pad.name} has no output"
        return output

expose_source_pad(pad_full_name)

Mark an internal source pad for external exposure.

Most useful inside ComposedTransformElement._build(), where it enables multilink patterns -- the same internal source pad feeds an internal sink and is also surfaced externally. On ComposedSourceElement the source pads of the wrapped sources are already exposed by default unless they're consumed internally; reach for also_expose_source_pads= (the constructor kwarg) instead.

Only valid during _build().

Parameters:

Name Type Description Default
pad_full_name str

Full pad name in format "element_name:src:pad_name".

required
Source code in src/sgn/compose.py
def expose_source_pad(self, pad_full_name: str) -> None:
    """Mark an internal source pad for external exposure.

    Most useful inside ``ComposedTransformElement._build()``, where it
    enables multilink patterns -- the same internal source pad feeds an
    internal sink *and* is also surfaced externally. On
    ``ComposedSourceElement`` the source pads of the wrapped sources are
    already exposed by default unless they're consumed internally; reach
    for ``also_expose_source_pads=`` (the constructor kwarg) instead.

    Only valid during ``_build()``.

    Args:
        pad_full_name: Full pad name in format ``"element_name:src:pad_name"``.
    """
    assert self._in_build, "expose_source_pad() can only be called during _build()"
    self.also_expose_source_pads.append(pad_full_name)

internal()

No-op - Pipeline handles all execution via merged graph.

Source code in src/sgn/compose.py
def internal(self) -> None:
    """No-op - Pipeline handles all execution via merged graph."""

new(pad)

Return output from internal boundary pad (already executed by Pipeline).

Source code in src/sgn/compose.py
def new(self, pad: SourcePad) -> Frame:
    """Return output from internal boundary pad (already executed by Pipeline)."""
    internal_pad = self._boundary_source_pads[pad.pad_name]
    output = internal_pad.output
    assert output is not None, f"Internal pad {internal_pad.name} has no output"
    return output

ComposedTransformElement dataclass

Bases: ComposedElementMixin, TransformElement


              flowchart TD
              sgn.compose.ComposedTransformElement[ComposedTransformElement]
              sgn.compose.ComposedElementMixin[ComposedElementMixin]
              sgn.base.TransformElement[TransformElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.compose.ComposedElementMixin --> sgn.compose.ComposedTransformElement
                
                sgn.base.TransformElement --> sgn.compose.ComposedTransformElement
                                sgn.base.ElementLike --> sgn.base.TransformElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.compose.ComposedTransformElement href "" "sgn.compose.ComposedTransformElement"
              click sgn.compose.ComposedElementMixin href "" "sgn.compose.ComposedElementMixin"
              click sgn.base.TransformElement href "" "sgn.base.TransformElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A composed element that behaves like a TransformElement.

Created from: One or more TransformElements. Exposes: Sink pads from boundary (inputs), Source pads from boundary (outputs).

Example:

>>> @dataclass(repr=False, kw_only=True)
... class ScaleAndOffset(ComposedTransformElement):
...     factor: int = 2
...     offset: int = 10
...
...     def _validate(self):
...         if self.factor <= 0:
...             raise ValueError("factor must be positive")
...
...     def _build(self):
...         scale = Multiply(name=f"{self.name}_scale",
...                          factor=self.factor)
...         add = Add(name=f"{self.name}_add",
...                   value=self.offset)
...         self.connect(scale, add)
...
>>> t = ScaleAndOffset(name="so", factor=3, offset=100)
>>> pipeline.connect(upstream, t)
>>> pipeline.connect(t, downstream)

For multilink patterns, call self.expose_source_pad() in _build() to expose a source pad even when it is internally connected:

>>> def _build(self):
...     proc = Processor(name=f"{self.name}_proc", ...)
...     monitor = Monitor(name=f"{self.name}_mon", ...)
...     self.connect(proc, monitor,
...                  link_map={"stats": "stats"})
...     # Expose stats externally too (multilink)
...     self.expose_source_pad(
...         f"{self.name}_proc:src:stats")
Source code in src/sgn/compose.py
@dataclass(repr=False, kw_only=True)
class ComposedTransformElement(ComposedElementMixin, TransformElement):
    """A composed element that behaves like a TransformElement.

    Created from: One or more TransformElements.
    Exposes: Sink pads from boundary (inputs), Source pads from boundary
    (outputs).

    Example:

        >>> @dataclass(repr=False, kw_only=True)
        ... class ScaleAndOffset(ComposedTransformElement):
        ...     factor: int = 2
        ...     offset: int = 10
        ...
        ...     def _validate(self):
        ...         if self.factor <= 0:
        ...             raise ValueError("factor must be positive")
        ...
        ...     def _build(self):
        ...         scale = Multiply(name=f"{self.name}_scale",
        ...                          factor=self.factor)
        ...         add = Add(name=f"{self.name}_add",
        ...                   value=self.offset)
        ...         self.connect(scale, add)
        ...
        >>> t = ScaleAndOffset(name="so", factor=3, offset=100)
        >>> pipeline.connect(upstream, t)
        >>> pipeline.connect(t, downstream)

    For multilink patterns, call ``self.expose_source_pad()`` in
    ``_build()`` to expose a source pad even when it is internally
    connected:

        >>> def _build(self):
        ...     proc = Processor(name=f"{self.name}_proc", ...)
        ...     monitor = Monitor(name=f"{self.name}_mon", ...)
        ...     self.connect(proc, monitor,
        ...                  link_map={"stats": "stats"})
        ...     # Expose stats externally too (multilink)
        ...     self.expose_source_pad(
        ...         f"{self.name}_proc:src:stats")
    """

    also_expose_source_pads: list[str] = field(default_factory=list)
    update_pad_names: dict[str, dict[str, str]] = field(default_factory=dict)
    source_pad_names: list[str] = field(init=False, default_factory=list)
    sink_pad_names: list[str] = field(init=False, default_factory=list)

    _prebuilt_graph: Graph | None = field(default=None, repr=False)

    _internal_graph: Graph = field(init=False)
    _boundary_source_pads: dict[str, SourcePad] = field(
        init=False, default_factory=dict
    )
    _boundary_sink_pads: dict[str, list[SinkPad]] = field(
        init=False, default_factory=dict
    )
    _virtual_sources: dict[str, SourcePad] = field(init=False, default_factory=dict)
    _in_build: bool = field(init=False, default=False)

    def expose_source_pad(self, pad_full_name: str) -> None:
        """Mark a source pad for external exposure even when internally linked.

        Use during ``_build()`` for multilink patterns where an internal
        source pad feeds both an internal sink and external consumers.

        Only valid during ``_build()``.

        Args:
            pad_full_name: Full pad name in format ``"element_name:src:pad_name"``.
        """
        assert self._in_build, "expose_source_pad() can only be called during _build()"
        self.also_expose_source_pads.append(pad_full_name)

    def __post_init__(self):
        UniqueID.__post_init__(self)

        # Independent copy so expose_source_pad() doesn't mutate caller input.
        self.also_expose_source_pads = list(self.also_expose_source_pads)

        self._populate_internal_graph(self._prebuilt_graph)

        if not self.internal_elements:
            raise ValueError("ComposedTransformElement requires at least one element")
        for elem in self.internal_elements:
            if isinstance(elem, SourceElement):
                raise TypeError(
                    f"Transform composition cannot contain SourceElements, "
                    f"got {type(elem).__name__}"
                )

        self._identify_boundary_pads()
        self._apply_exposures(self.also_expose_source_pads)
        self._apply_transform_renames(self.update_pad_names)

        self.sink_pad_names = list(self._boundary_sink_pads.keys())
        self.source_pad_names = list(self._boundary_source_pads.keys())

        self.graph = {}
        self.internal_pad = InternalPad(name="inl", element=self, call=self.internal)
        self.sink_pads = [
            SinkPad(name=pad_name, element=self, call=self.pull)
            for pad_name in self.sink_pad_names
        ]

        # Virtual sources feed frames injected by pull() into internal boundary sinks
        for snk_name in self.sink_pad_names:
            self._virtual_sources[snk_name] = SourcePad(
                name=f"_vs_{snk_name}",
                element=self,
                call=lambda pad: pad.output,
            )

        self.source_pads = [
            SourcePad(name=pad_name, element=self, call=self.new)
            for pad_name in self.source_pad_names
        ]
        self.srcs = {n: p for n, p in zip(self.source_pad_names, self.source_pads)}
        self.snks = {n: p for n, p in zip(self.sink_pad_names, self.sink_pads)}
        self.rsrcs = {p: n for n, p in zip(self.source_pad_names, self.source_pads)}
        self.rsnks = {p: n for n, p in zip(self.sink_pad_names, self.sink_pads)}

        if not self.source_pads or not self.sink_pads:  # pragma: no cover
            raise ValueError(
                "ComposedTransformElement must have both sink and source pads"
            )

        # Merge the internal graph, then wire in our virtual sources and composed pads
        self.graph.update(self._internal_graph.graph)
        for snk_name, virtual_src in self._virtual_sources.items():
            for internal_snk in self._boundary_sink_pads[snk_name]:
                self.graph.update(internal_snk.link(virtual_src))
            self.graph[virtual_src] = {self.snks[snk_name]}
        self.graph[self.internal_pad] = set(self._boundary_source_pads.values())
        self.graph.update({s: {self.internal_pad} for s in self.source_pads})

    def _apply_transform_renames(self, renames: dict[str, dict[str, str]]) -> None:
        """Apply ``update_pad_names`` to either boundary sinks or sources.

        Each entry's key must match an existing boundary pad name on exactly
        one side; the value dict's entries replace it. If the same name
        appears on both a boundary sink and a boundary source (which side is
        meant?) raises ``ValueError``; if it matches neither side, raises
        ``KeyError``.
        """
        for old_name, new_pads in renames.items():
            in_sinks = old_name in self._boundary_sink_pads
            in_sources = old_name in self._boundary_source_pads
            if in_sinks and in_sources:
                raise ValueError(
                    f"Pad name '{old_name}' is ambiguous: matches both a "
                    f"boundary sink and a boundary source. Rename one side "
                    f"first to disambiguate."
                )
            if in_sinks:
                self._apply_sink_renames({old_name: new_pads})
            elif in_sources:
                self._apply_source_renames({old_name: new_pads})
            else:
                raise KeyError(f"Pad name {old_name} not found in boundary pads")

    @property
    def pad_list(self) -> list[Pad]:
        pads: list[Pad] = [
            *self.source_pads,
            *self.sink_pads,
            self.internal_pad,
            *self._virtual_sources.values(),
        ]
        for element in self.internal_elements:
            pads.extend(element.pad_list)
        return pads

    def pull(self, pad: SinkPad, frame: Frame) -> None:
        """Inject frame into virtual source for internal boundary sink."""
        self._virtual_sources[pad.pad_name].output = frame

    def internal(self) -> None:
        """No-op - Pipeline handles all execution via merged graph."""

    def new(self, pad: SourcePad) -> Frame:
        """Return output from internal boundary pad (already executed by Pipeline)."""
        internal_pad = self._boundary_source_pads[pad.pad_name]
        output = internal_pad.output
        assert output is not None, f"Internal pad {internal_pad.name} has no output"
        return output

expose_source_pad(pad_full_name)

Mark a source pad for external exposure even when internally linked.

Use during _build() for multilink patterns where an internal source pad feeds both an internal sink and external consumers.

Only valid during _build().

Parameters:

Name Type Description Default
pad_full_name str

Full pad name in format "element_name:src:pad_name".

required
Source code in src/sgn/compose.py
def expose_source_pad(self, pad_full_name: str) -> None:
    """Mark a source pad for external exposure even when internally linked.

    Use during ``_build()`` for multilink patterns where an internal
    source pad feeds both an internal sink and external consumers.

    Only valid during ``_build()``.

    Args:
        pad_full_name: Full pad name in format ``"element_name:src:pad_name"``.
    """
    assert self._in_build, "expose_source_pad() can only be called during _build()"
    self.also_expose_source_pads.append(pad_full_name)

internal()

No-op - Pipeline handles all execution via merged graph.

Source code in src/sgn/compose.py
def internal(self) -> None:
    """No-op - Pipeline handles all execution via merged graph."""

new(pad)

Return output from internal boundary pad (already executed by Pipeline).

Source code in src/sgn/compose.py
def new(self, pad: SourcePad) -> Frame:
    """Return output from internal boundary pad (already executed by Pipeline)."""
    internal_pad = self._boundary_source_pads[pad.pad_name]
    output = internal_pad.output
    assert output is not None, f"Internal pad {internal_pad.name} has no output"
    return output

pull(pad, frame)

Inject frame into virtual source for internal boundary sink.

Source code in src/sgn/compose.py
def pull(self, pad: SinkPad, frame: Frame) -> None:
    """Inject frame into virtual source for internal boundary sink."""
    self._virtual_sources[pad.pad_name].output = frame