Skip to content

sgn.apps

Pipeline class and related utilities to establish and execute a graph of element tasks.

Graph

Base class for managing element graphs and pad registries.

This class provides the core functionality for building directed acyclic graphs of elements and pads. It handles element insertion, pad registration, and implicit/explicit linking between pads.

Both Pipeline and composed elements use this class for graph management.

Source code in src/sgn/apps.py
class Graph:
    """Base class for managing element graphs and pad registries.

    This class provides the core functionality for building directed acyclic graphs
    of elements and pads. It handles element insertion, pad registration, and
    implicit/explicit linking between pads.

    Both Pipeline and composed elements use this class for graph management.
    """

    def __init__(self) -> None:
        """Initialize an empty graph with registry and element tracking."""
        self._registry: dict[str, Pad | Element] = {}
        self.graph: dict[Pad, set[Pad]] = {}
        self.elements: list[Element] = []

    def __getitem__(self, name: str) -> Pad | Element:
        """Return a graph element or pad by name."""
        return self._registry[name]

    def __contains__(self, name: str) -> bool:
        """Does graph contain element or pad"""
        return name in self._registry

    def _insert_element(self, element: Element) -> None:
        """Insert a single element into the graph.

        This is the core insertion logic without sink tracking.
        Subclasses can override to add additional behavior.

        Args:
            element: The element to insert
        """
        assert isinstance(
            element, ElementLike
        ), f"Element {element} is not an instance of a sgn.Element"
        assert (
            element.name not in self._registry
        ), f"Element name '{element.name}' is already in use in this graph"
        self._registry[element.name] = element
        for pad in element.pad_list:
            assert (
                pad.name not in self._registry
            ), f"Pad name '{pad.name}' is already in use in this graph"
            self._registry[pad.name] = pad
        self.graph.update(element.graph)
        self.elements.append(element)

    def insert(
        self,
        *elements: Element,
        link_map: dict[str | SinkPad, str | SourcePad] | None = None,
    ) -> Self:
        """Insert element(s) into the graph.

        Args:
            *elements:
                Iterable[Element], the ordered elements to insert into the graph
            link_map:
                dict[str | SinkPad, str | SourcePad] | None,
                a mapping of sink pad to source pad names to link

        Returns:
            Self, the graph with the elements inserted
        """
        for element in elements:
            self._insert_element(element)
        if link_map is not None:
            self.link(link_map)
        return self

    def link(self, link_map: dict[str | SinkPad, str | SourcePad]) -> Self:
        """Link pads in a graph.

        Args:
            link_map:
                dict[str, str], a mapping of sink pad to source pad names to link.
                Keys are sink pad names, values are source pad names.
                Data flows from value -> key.
        """
        for sink_pad_name, source_pad_name in link_map.items():
            if isinstance(sink_pad_name, str):
                sink_pad = self._registry[sink_pad_name]
            else:
                sink_pad = sink_pad_name
            if isinstance(source_pad_name, str):
                source_pad = self._registry[source_pad_name]
            else:
                source_pad = source_pad_name

            assert isinstance(sink_pad, SinkPad), f"not a sink pad: {sink_pad}"
            assert isinstance(source_pad, SourcePad), f"not a source pad: {source_pad}"

            graph = sink_pad.link(source_pad)
            self.graph.update(graph)

        return self

    def connect(
        self,
        source: Element | ElementGroup | PadSelection,
        sink: Element | ElementGroup | PadSelection,
        link_map: dict[str, str] | None = None,
    ) -> Self:
        """Connect elements, ElementGroups, or PadSelections using implicit linking.

        This method supports multiple linking patterns:
        1. Element-to-element linking with implicit pad matching:
           graph.connect(source_element, sink_element)
        2. Element-to-element linking with explicit mapping:
           graph.connect(source_element, sink_element, link_map={"sink": "source"})
        3. ElementGroup linking (supports elements and pad selections):
           graph.connect(group(s1, s2), sink_element)
           graph.connect(group(source, select(element, "pad1")), sink)
        4. Direct PadSelection linking:
           graph.connect(select(source, "pad1"), sink_element)

        Implicit linking strategies (when no link_map provided):
        (1) Exact match: Connect when source and sink pad names are identical
        (2) Partial match: Connect all matching pad names (ignores non-matching pads)
        (3) 1-to-N: Single source pad, connect to all sink pads
        (4) Reject N-to-1: multiple disjoint-named source pads with a single sink
            pad raises ``ValueError`` rather than silently picking one source.

        Args:
            source:
                Element, ElementGroup, or PadSelection, the source for linking
            sink:
                Element, ElementGroup, or PadSelection, the sink for linking
            link_map:
                dict[str, str], optional, explicit mapping of sink pad names to
                source pad names.

        Returns:
            Self: The graph with the new links added.

        Raises:
            ValueError: If implicit linking strategy is ambiguous.
            TypeError: If arguments are of unexpected types.
        """
        if isinstance(source, SinkElement):
            msg = f"Source '{source.name}' is a SinkElement and has no source pads"
            raise ValueError(msg)
        if isinstance(sink, SourceElement):
            msg = f"Sink '{sink.name}' is a SourceElement and has no sink pads"
            raise ValueError(msg)

        source_pads = source.srcs
        sink_pads = sink.snks

        # Ensure all elements are inserted in graph
        def ensure_elements_inserted(
            obj: Element | ElementGroup | PadSelection,
        ) -> None:
            if isinstance(obj, (SourceElement, TransformElement, SinkElement)):
                if obj.name not in self._registry:
                    self.insert(obj)
            elif isinstance(obj, ElementGroup):
                for element in obj.elements:
                    if element.name not in self._registry:
                        self.insert(element)
            elif isinstance(obj, PadSelection):
                if obj.element.name not in self._registry:
                    self.insert(obj.element)

        ensure_elements_inserted(source)
        ensure_elements_inserted(sink)

        return self._connect_pads(source_pads, sink_pads, link_map)

    def _connect_pads(
        self,
        source_pads: dict[str, SourcePad],
        sink_pads: dict[str, SinkPad],
        link_map: dict[str, str] | None = None,
    ) -> Self:
        """Connect source and sink pads using implicit linking strategies.

        Args:
            source_pads: Dictionary mapping pad names to source pads
            sink_pads: Dictionary mapping pad names to sink pads
            link_map: Optional explicit mapping of sink pad names to source pad names

        Returns:
            Self with the new links added
        """
        resolved_link_map: dict[str | SinkPad, str | SourcePad]
        source_pad_names = set(source_pads.keys())
        sink_pad_names = set(sink_pads.keys())

        # Determine linking strategy
        if link_map:
            # Explicit mapping provided
            resolved_link_map = {}
            for sink_pad_name, source_pad_name in link_map.items():
                if sink_pad_name not in sink_pads:
                    msg = f"sink pad '{sink_pad_name}' not found"
                    raise KeyError(msg)
                if source_pad_name not in source_pads:
                    msg = f"source pad '{source_pad_name}' not found"
                    raise KeyError(msg)

                sink_pad = sink_pads[sink_pad_name]
                source_pad = source_pads[source_pad_name]
                resolved_link_map[sink_pad] = source_pad

            return self.link(resolved_link_map)

        elif source_pad_names == sink_pad_names:
            # One-to-one linking strategy: same pad names
            resolved_link_map = {
                sink_pads[name]: source_pads[name] for name in source_pad_names
            }
            return self.link(resolved_link_map)

        elif source_pad_names & sink_pad_names:
            # Partial matching strategy: connect all matching pad names
            matching_names = source_pad_names & sink_pad_names
            resolved_link_map = {
                sink_pads[name]: source_pads[name] for name in matching_names
            }
            return self.link(resolved_link_map)

        elif len(sink_pad_names) == 1 and len(source_pad_names) > 1:
            # N sources to a single sink pad is not supported: a sink pad can
            # only be linked to one source pad.
            sink_pad = next(iter(sink_pads.values()))
            raise ValueError(
                f"cannot connect {len(source_pad_names)} source pads to a "
                f"single sink pad ({sink_pad.name}); a sink pad can only be "
                "linked to one source pad. provide an explicit link_map to "
                "select which source to use."
            )

        elif len(source_pad_names) == 1:
            # One-to-N linking strategy
            source_pad = next(iter(source_pads.values()))
            resolved_link_map = {
                sink_pad: source_pad for sink_pad in sink_pads.values()
            }
            return self.link(resolved_link_map)

        else:
            msg = (
                "unable to determine unambiguous linking strategy from source "
                "and sink pads. an explicit link_map is required."
            )
            raise ValueError(msg)

__contains__(name)

Does graph contain element or pad

Source code in src/sgn/apps.py
def __contains__(self, name: str) -> bool:
    """Does graph contain element or pad"""
    return name in self._registry

__getitem__(name)

Return a graph element or pad by name.

Source code in src/sgn/apps.py
def __getitem__(self, name: str) -> Pad | Element:
    """Return a graph element or pad by name."""
    return self._registry[name]

__init__()

Initialize an empty graph with registry and element tracking.

Source code in src/sgn/apps.py
def __init__(self) -> None:
    """Initialize an empty graph with registry and element tracking."""
    self._registry: dict[str, Pad | Element] = {}
    self.graph: dict[Pad, set[Pad]] = {}
    self.elements: list[Element] = []

connect(source, sink, link_map=None)

Connect elements, ElementGroups, or PadSelections using implicit linking.

This method supports multiple linking patterns: 1. Element-to-element linking with implicit pad matching: graph.connect(source_element, sink_element) 2. Element-to-element linking with explicit mapping: graph.connect(source_element, sink_element, link_map={"sink": "source"}) 3. ElementGroup linking (supports elements and pad selections): graph.connect(group(s1, s2), sink_element) graph.connect(group(source, select(element, "pad1")), sink) 4. Direct PadSelection linking: graph.connect(select(source, "pad1"), sink_element)

Implicit linking strategies (when no link_map provided): (1) Exact match: Connect when source and sink pad names are identical (2) Partial match: Connect all matching pad names (ignores non-matching pads) (3) 1-to-N: Single source pad, connect to all sink pads (4) Reject N-to-1: multiple disjoint-named source pads with a single sink pad raises ValueError rather than silently picking one source.

Parameters:

Name Type Description Default
source Element | ElementGroup | PadSelection

Element, ElementGroup, or PadSelection, the source for linking

required
sink Element | ElementGroup | PadSelection

Element, ElementGroup, or PadSelection, the sink for linking

required
link_map dict[str, str] | None

dict[str, str], optional, explicit mapping of sink pad names to source pad names.

None

Returns:

Name Type Description
Self Self

The graph with the new links added.

Raises:

Type Description
ValueError

If implicit linking strategy is ambiguous.

TypeError

If arguments are of unexpected types.

Source code in src/sgn/apps.py
def connect(
    self,
    source: Element | ElementGroup | PadSelection,
    sink: Element | ElementGroup | PadSelection,
    link_map: dict[str, str] | None = None,
) -> Self:
    """Connect elements, ElementGroups, or PadSelections using implicit linking.

    This method supports multiple linking patterns:
    1. Element-to-element linking with implicit pad matching:
       graph.connect(source_element, sink_element)
    2. Element-to-element linking with explicit mapping:
       graph.connect(source_element, sink_element, link_map={"sink": "source"})
    3. ElementGroup linking (supports elements and pad selections):
       graph.connect(group(s1, s2), sink_element)
       graph.connect(group(source, select(element, "pad1")), sink)
    4. Direct PadSelection linking:
       graph.connect(select(source, "pad1"), sink_element)

    Implicit linking strategies (when no link_map provided):
    (1) Exact match: Connect when source and sink pad names are identical
    (2) Partial match: Connect all matching pad names (ignores non-matching pads)
    (3) 1-to-N: Single source pad, connect to all sink pads
    (4) Reject N-to-1: multiple disjoint-named source pads with a single sink
        pad raises ``ValueError`` rather than silently picking one source.

    Args:
        source:
            Element, ElementGroup, or PadSelection, the source for linking
        sink:
            Element, ElementGroup, or PadSelection, the sink for linking
        link_map:
            dict[str, str], optional, explicit mapping of sink pad names to
            source pad names.

    Returns:
        Self: The graph with the new links added.

    Raises:
        ValueError: If implicit linking strategy is ambiguous.
        TypeError: If arguments are of unexpected types.
    """
    if isinstance(source, SinkElement):
        msg = f"Source '{source.name}' is a SinkElement and has no source pads"
        raise ValueError(msg)
    if isinstance(sink, SourceElement):
        msg = f"Sink '{sink.name}' is a SourceElement and has no sink pads"
        raise ValueError(msg)

    source_pads = source.srcs
    sink_pads = sink.snks

    # Ensure all elements are inserted in graph
    def ensure_elements_inserted(
        obj: Element | ElementGroup | PadSelection,
    ) -> None:
        if isinstance(obj, (SourceElement, TransformElement, SinkElement)):
            if obj.name not in self._registry:
                self.insert(obj)
        elif isinstance(obj, ElementGroup):
            for element in obj.elements:
                if element.name not in self._registry:
                    self.insert(element)
        elif isinstance(obj, PadSelection):
            if obj.element.name not in self._registry:
                self.insert(obj.element)

    ensure_elements_inserted(source)
    ensure_elements_inserted(sink)

    return self._connect_pads(source_pads, sink_pads, link_map)

insert(*elements, link_map=None)

Insert element(s) into the graph.

Parameters:

Name Type Description Default
*elements Element

Iterable[Element], the ordered elements to insert into the graph

()
link_map dict[str | SinkPad, str | SourcePad] | None

dict[str | SinkPad, str | SourcePad] | None, a mapping of sink pad to source pad names to link

None

Returns:

Type Description
Self

Self, the graph with the elements inserted

Source code in src/sgn/apps.py
def insert(
    self,
    *elements: Element,
    link_map: dict[str | SinkPad, str | SourcePad] | None = None,
) -> Self:
    """Insert element(s) into the graph.

    Args:
        *elements:
            Iterable[Element], the ordered elements to insert into the graph
        link_map:
            dict[str | SinkPad, str | SourcePad] | None,
            a mapping of sink pad to source pad names to link

    Returns:
        Self, the graph with the elements inserted
    """
    for element in elements:
        self._insert_element(element)
    if link_map is not None:
        self.link(link_map)
    return self

Link pads in a graph.

Parameters:

Name Type Description Default
link_map dict[str | SinkPad, str | SourcePad]

dict[str, str], a mapping of sink pad to source pad names to link. Keys are sink pad names, values are source pad names. Data flows from value -> key.

required
Source code in src/sgn/apps.py
def link(self, link_map: dict[str | SinkPad, str | SourcePad]) -> Self:
    """Link pads in a graph.

    Args:
        link_map:
            dict[str, str], a mapping of sink pad to source pad names to link.
            Keys are sink pad names, values are source pad names.
            Data flows from value -> key.
    """
    for sink_pad_name, source_pad_name in link_map.items():
        if isinstance(sink_pad_name, str):
            sink_pad = self._registry[sink_pad_name]
        else:
            sink_pad = sink_pad_name
        if isinstance(source_pad_name, str):
            source_pad = self._registry[source_pad_name]
        else:
            source_pad = source_pad_name

        assert isinstance(sink_pad, SinkPad), f"not a sink pad: {sink_pad}"
        assert isinstance(source_pad, SourcePad), f"not a source pad: {source_pad}"

        graph = sink_pad.link(source_pad)
        self.graph.update(graph)

    return self

Pipeline

Bases: Graph


              flowchart TD
              sgn.apps.Pipeline[Pipeline]
              sgn.apps.Graph[Graph]

                              sgn.apps.Graph --> sgn.apps.Pipeline
                


              click sgn.apps.Pipeline href "" "sgn.apps.Pipeline"
              click sgn.apps.Graph href "" "sgn.apps.Graph"
            

A Pipeline is essentially a directed acyclic graph of tasks that process frames.

These tasks are grouped using Pads and Elements. The Pipeline class is responsible for registering methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. It also establishes an event loop to execute the graph asynchronously.

Source code in src/sgn/apps.py
class Pipeline(Graph):
    """A Pipeline is essentially a directed acyclic graph of tasks that process frames.

    These tasks are grouped using Pads and Elements. The Pipeline class is responsible
    for registering methods to produce source, transform and sink elements and to
    assemble those elements in a directed acyclic graph. It also establishes an event
    loop to execute the graph asynchronously.
    """

    def __init__(self) -> None:
        """Class to establish and execute a graph of elements that will process frames.

        Registers methods to produce source, transform and sink elements and to assemble
        those elements in a directed acyclic graph. Also establishes an event loop.
        """
        super().__init__()
        self.loop: asyncio.AbstractEventLoop | None = None
        self.__loop_counter = 0
        self.sinks: dict[str, SinkElement] = {}

    def _insert_element(self, element: Element) -> None:
        """Insert element and track sink elements."""
        super()._insert_element(element)
        if isinstance(element, SinkElement):
            self.sinks[element.name] = element

    def nodes(self, pads: bool = True, intra: bool = False) -> tuple[str, ...]:
        """Get the nodes in the pipeline.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges,
                e.g. from an element's sink pads to its source pads. In this case,
                whether to include Internal Pads in the graph.

        Returns:
            list[str], the nodes in the pipeline
        """
        if pads:
            pad_types = [SinkPad, SourcePad]
            if intra:
                pad_types.append(InternalPad)

            return tuple(
                sorted(
                    [
                        pad.name
                        for pad in self._registry.values()
                        if isinstance(pad, tuple(pad_types))
                    ]
                )
            )
        element_types = [TransformElement, SinkElement, SourceElement]
        return tuple(
            sorted(
                [
                    element.name
                    for element in self._registry.values()
                    if isinstance(element, tuple(element_types))
                ]
            )
        )

    def edges(
        self, pads: bool = True, intra: bool = False
    ) -> tuple[tuple[str, str], ...]:
        """Get the edges in the pipeline.

        Args:
            pads:
                bool, whether to include pads in the graph. If True, the graph will only
                consist of pads. If False, the graph will consist only of elements.
            intra:
                bool, default False, whether or not to include intra-element edges, e.g.
                from an element's sink pads to its source pads

        Returns:
        """
        edges = set()
        for target, sources in self.graph.items():
            for source in sources:
                if not intra and isinstance(source, (SinkPad, InternalPad)):
                    continue

                if pads:
                    edges.add((source.name, target.name))
                else:
                    source_element = source.element
                    target_element = target.element
                    edges.add((source_element.name, target_element.name))
        return tuple(sorted(edges))

    def to_graph(self, label: str | None = None, expand_composed: bool = True):
        """graphviz.DiGraph representation of pipeline

        Args:
            label:
                str, label for the graph
            expand_composed:
                bool, whether to expand composed elements into cluster subgraphs

        Returns:
            graphviz.Digraph: the graph object
        """
        return visualize(self, label=label, expand_composed=expand_composed)

    def to_dot(self, label: str | None = None, expand_composed: bool = True) -> str:
        """Convert the pipeline to a graph using graphviz.

        Args:
            label:
                str, label for the graph
            expand_composed:
                bool, whether to expand composed elements into cluster subgraphs

        Returns:
            str, the graph representation of the pipeline
        """
        return visualize(self, label=label, expand_composed=expand_composed).source

    def visualize(
        self,
        path: str,
        label: str | None = None,
        expand_composed: bool = True,
    ) -> None:
        """Convert the pipeline to a graph using graphviz, then render into a visual
        file.

        Args:
            path:
                str, the relative or full path to the file to write the graph to
            label:
                str, label for the graph
            expand_composed:
                bool, whether to expand composed elements into cluster subgraphs
        """
        visualize(self, label=label, path=Path(path), expand_composed=expand_composed)

    @async_sgn_mem_profile(logger)
    async def __execute_graph_loop(self) -> None:
        async def _partial(node):
            try:
                t0 = time.perf_counter()
                result = await node()
                testpoint.record_exec(node, time.perf_counter() - t0)
                return result
            except Exception as e:
                # Annotate the exception with pad context if supported (3.11+)
                # and re-raise the original — reconstructing the exception
                # loses attributes and breaks for non-(str,)-constructor types.
                if hasattr(e, "add_note"):
                    e.add_note(f"(from pad '{node.name}')")
                raise

        self.__loop_counter += 1
        logger.debug("Executing graph loop %s:", self.__loop_counter)
        ts = graphlib.TopologicalSorter(self.graph)
        ts.prepare()
        loop = asyncio.get_running_loop()
        while ts.is_active():
            # concurrently execute the next batch of ready nodes
            nodes = ts.get_ready()
            tasks = [loop.create_task(_partial(node)) for node in nodes]
            try:
                await asyncio.gather(*tasks)
            except BaseException:
                # Cancel sibling tasks so they don't keep mutating state
                # after one of their batch-mates failed. cancel() on a
                # finished task is a safe no-op. Note: in threaded mode a
                # callback already running on a worker thread cannot be
                # interrupted — cancel() only raises CancelledError at the
                # awaiting task, so that callback runs to completion (and
                # _executor_for's shutdown(wait=True) blocks until it does).
                for task in tasks:
                    task.cancel()
                await asyncio.gather(*tasks, return_exceptions=True)
                raise
            ts.done(*nodes)

    async def _execute_graphs(self, executor: Executor | None = None) -> None:
        """Async graph execution function.

        If ``executor`` is not None, pads on elements with ``thread_safe = True``
        will dispatch their callbacks onto it for the duration of the run.
        """
        with pipeline_executor(executor):
            while not all(sink.at_eos for sink in self.sinks.values()):
                await self.__execute_graph_loop()
                testpoint.update()

    @contextlib.contextmanager
    def _executor_for(
        self,
        threaded: int | Executor | None,
    ) -> Iterator[Executor | None]:
        """Yield the executor for this run; close it iff Pipeline created it.

        ``threaded`` selects the dispatch mode:

        * ``None`` (default) yields ``None`` — pad callbacks run on the
          event-loop thread.
        * A positive ``int`` creates a ``ThreadPoolExecutor`` of that size
          for the duration of the context and shuts it down on exit.
        * An ``Executor`` instance is yielded as-is and never shut down;
          the caller owns its lifecycle. It must be a *thread-based* executor
          (e.g. ``ThreadPoolExecutor``): pad dispatch relies on shared-memory
          mutation of element state, so a ``ProcessPoolExecutor`` would either
          fail to pickle the bound callbacks/frames or run them against copied
          state whose mutations never propagate back. This requirement is the
          caller's responsibility — it is not validated here, so passing a
          process pool fails later with a cryptic pickling error rather than a
          clear rejection.
        """
        if threaded is None:
            yield None
            return

        if isinstance(threaded, Executor):
            yield threaded
            return

        if isinstance(threaded, bool) or not isinstance(threaded, int):
            raise TypeError(
                f"threaded must be a positive int, an Executor, or None; "
                f"got {type(threaded).__name__}"
            )
        if threaded <= 0:
            raise ValueError(f"threaded worker count must be positive, got {threaded}")

        pool = ThreadPoolExecutor(max_workers=threaded, thread_name_prefix="sgn-pad")
        try:
            yield pool
        finally:
            pool.shutdown(wait=True)

    def check(self) -> None:
        """Check that pipeline elements are connected.

        Throws an RuntimeError exception if unconnected pads are
        encountered.

        """
        if not self.sinks:
            msg = "Pipeline contains no sink elements."
            raise RuntimeError(msg)
        for element in self.elements:
            for source_pad in element.source_pads:
                if not source_pad.is_linked:
                    msg = f"Source pad not linked: {source_pad}"
                    raise RuntimeError(msg)
            for sink_pad in element.sink_pads:
                if not sink_pad.is_linked:
                    msg = f"Sink pad not linked: {sink_pad}"
                    raise RuntimeError(msg)

    def _startup(self) -> None:
        """Call on_startup() on every element in topological (graph) order."""
        # Build an element-level dependency graph from the pad-level graph
        elements_by_name = {e.name: e for e in self.elements}
        element_deps: dict[str, set[str]] = {e.name: set() for e in self.elements}
        for pad, deps in self.graph.items():
            for dep in deps:
                if (
                    pad.element.name in elements_by_name
                    and dep.element.name in elements_by_name
                    and pad.element.name != dep.element.name
                ):
                    element_deps[pad.element.name].add(dep.element.name)

        ts = graphlib.TopologicalSorter(element_deps)
        for name in ts.static_order():
            elements_by_name[name].on_startup()

    def run(
        self,
        auto_parallelize: bool = True,
        threaded: int | Executor | None = None,
    ) -> None:
        """Run the pipeline until End Of Stream (EOS)

        Args:
            auto_parallelize:
                If True (default), automatically detects if parallelization is
                needed and handles it transparently. If False, runs the pipeline
                normally without parallelization detection.
            threaded:
                Controls thread-pool dispatch of pad callbacks. ``None``
                (default) runs everything on the event-loop thread. A positive
                ``int`` creates a ``ThreadPoolExecutor`` of that size for the
                duration of the run and shuts it down on exit. An ``Executor``
                instance is used as-is; the caller owns its lifecycle and
                Pipeline will not shut it down. It must be thread-based (pad
                dispatch mutates shared element state in place; a
                ``ProcessPoolExecutor`` is unsupported). In all cases, only
                elements that opt in via ``thread_safe = True`` dispatch onto
                the pool;
                everything else still runs on the event-loop thread.
        """
        configure_sgn_logging()
        if auto_parallelize:
            # Import here to avoid circular imports
            from sgn.subprocess import Parallelize

            # Use automatic parallelization detection
            if Parallelize.needs_parallelization(self):
                # Forward threaded: subprocess parallelization and thread-pool
                # dispatch are complementary, so thread_safe elements still
                # dispatch onto the pool in the main process while parallel
                # elements run their work in workers.
                with Parallelize(self) as parallelize:
                    parallelize.run(threaded=threaded)
                return

        # Run normally without parallelization
        self.check()
        self._startup()
        __start = time.perf_counter()
        with self._executor_for(threaded) as resolved:
            try:
                asyncio.get_running_loop()
                running_loop_exists = True
            except RuntimeError:
                running_loop_exists = False

            if not running_loop_exists:
                self.loop = asyncio.new_event_loop()
                try:
                    self.loop.run_until_complete(self._execute_graphs(resolved))
                finally:
                    self.loop.close()
            else:
                """If the event loop is running, e.g., running in a Jupyter
                Notebook, run the pipeline in a forked thread.
                """
                import threading

                # Capture any exception from the forked thread so it can be
                # re-raised on the calling thread; thread.join() otherwise
                # swallows it, silently completing failed runs.
                fork_exc: list[BaseException] = []

                def _run_in_fork(pipeline, exec_):
                    pipeline.loop = asyncio.new_event_loop()
                    asyncio.set_event_loop(pipeline.loop)
                    try:
                        pipeline.loop.run_until_complete(
                            pipeline._execute_graphs(exec_)
                        )
                    except BaseException as e:  # noqa: B036,BLE001
                        fork_exc.append(e)
                    finally:
                        pipeline.loop.close()

                thread = threading.Thread(target=_run_in_fork, args=(self, resolved))
                thread.start()
                thread.join()
                if fork_exc:
                    raise fork_exc[0]
        logger.info(
            "Pipeline().run() executed in %s seconds",
            (time.perf_counter() - __start),
        )

__init__()

Class to establish and execute a graph of elements that will process frames.

Registers methods to produce source, transform and sink elements and to assemble those elements in a directed acyclic graph. Also establishes an event loop.

Source code in src/sgn/apps.py
def __init__(self) -> None:
    """Class to establish and execute a graph of elements that will process frames.

    Registers methods to produce source, transform and sink elements and to assemble
    those elements in a directed acyclic graph. Also establishes an event loop.
    """
    super().__init__()
    self.loop: asyncio.AbstractEventLoop | None = None
    self.__loop_counter = 0
    self.sinks: dict[str, SinkElement] = {}

check()

Check that pipeline elements are connected.

Throws an RuntimeError exception if unconnected pads are encountered.

Source code in src/sgn/apps.py
def check(self) -> None:
    """Check that pipeline elements are connected.

    Throws an RuntimeError exception if unconnected pads are
    encountered.

    """
    if not self.sinks:
        msg = "Pipeline contains no sink elements."
        raise RuntimeError(msg)
    for element in self.elements:
        for source_pad in element.source_pads:
            if not source_pad.is_linked:
                msg = f"Source pad not linked: {source_pad}"
                raise RuntimeError(msg)
        for sink_pad in element.sink_pads:
            if not sink_pad.is_linked:
                msg = f"Sink pad not linked: {sink_pad}"
                raise RuntimeError(msg)

edges(pads=True, intra=False)

Get the edges in the pipeline.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

True
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads

False

Returns:

Source code in src/sgn/apps.py
def edges(
    self, pads: bool = True, intra: bool = False
) -> tuple[tuple[str, str], ...]:
    """Get the edges in the pipeline.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges, e.g.
            from an element's sink pads to its source pads

    Returns:
    """
    edges = set()
    for target, sources in self.graph.items():
        for source in sources:
            if not intra and isinstance(source, (SinkPad, InternalPad)):
                continue

            if pads:
                edges.add((source.name, target.name))
            else:
                source_element = source.element
                target_element = target.element
                edges.add((source_element.name, target_element.name))
    return tuple(sorted(edges))

nodes(pads=True, intra=False)

Get the nodes in the pipeline.

Parameters:

Name Type Description Default
pads bool

bool, whether to include pads in the graph. If True, the graph will only consist of pads. If False, the graph will consist only of elements.

True
intra bool

bool, default False, whether or not to include intra-element edges, e.g. from an element's sink pads to its source pads. In this case, whether to include Internal Pads in the graph.

False

Returns:

Type Description
tuple[str, ...]

list[str], the nodes in the pipeline

Source code in src/sgn/apps.py
def nodes(self, pads: bool = True, intra: bool = False) -> tuple[str, ...]:
    """Get the nodes in the pipeline.

    Args:
        pads:
            bool, whether to include pads in the graph. If True, the graph will only
            consist of pads. If False, the graph will consist only of elements.
        intra:
            bool, default False, whether or not to include intra-element edges,
            e.g. from an element's sink pads to its source pads. In this case,
            whether to include Internal Pads in the graph.

    Returns:
        list[str], the nodes in the pipeline
    """
    if pads:
        pad_types = [SinkPad, SourcePad]
        if intra:
            pad_types.append(InternalPad)

        return tuple(
            sorted(
                [
                    pad.name
                    for pad in self._registry.values()
                    if isinstance(pad, tuple(pad_types))
                ]
            )
        )
    element_types = [TransformElement, SinkElement, SourceElement]
    return tuple(
        sorted(
            [
                element.name
                for element in self._registry.values()
                if isinstance(element, tuple(element_types))
            ]
        )
    )

run(auto_parallelize=True, threaded=None)

Run the pipeline until End Of Stream (EOS)

Parameters:

Name Type Description Default
auto_parallelize bool

If True (default), automatically detects if parallelization is needed and handles it transparently. If False, runs the pipeline normally without parallelization detection.

True
threaded int | Executor | None

Controls thread-pool dispatch of pad callbacks. None (default) runs everything on the event-loop thread. A positive int creates a ThreadPoolExecutor of that size for the duration of the run and shuts it down on exit. An Executor instance is used as-is; the caller owns its lifecycle and Pipeline will not shut it down. It must be thread-based (pad dispatch mutates shared element state in place; a ProcessPoolExecutor is unsupported). In all cases, only elements that opt in via thread_safe = True dispatch onto the pool; everything else still runs on the event-loop thread.

None
Source code in src/sgn/apps.py
def run(
    self,
    auto_parallelize: bool = True,
    threaded: int | Executor | None = None,
) -> None:
    """Run the pipeline until End Of Stream (EOS)

    Args:
        auto_parallelize:
            If True (default), automatically detects if parallelization is
            needed and handles it transparently. If False, runs the pipeline
            normally without parallelization detection.
        threaded:
            Controls thread-pool dispatch of pad callbacks. ``None``
            (default) runs everything on the event-loop thread. A positive
            ``int`` creates a ``ThreadPoolExecutor`` of that size for the
            duration of the run and shuts it down on exit. An ``Executor``
            instance is used as-is; the caller owns its lifecycle and
            Pipeline will not shut it down. It must be thread-based (pad
            dispatch mutates shared element state in place; a
            ``ProcessPoolExecutor`` is unsupported). In all cases, only
            elements that opt in via ``thread_safe = True`` dispatch onto
            the pool;
            everything else still runs on the event-loop thread.
    """
    configure_sgn_logging()
    if auto_parallelize:
        # Import here to avoid circular imports
        from sgn.subprocess import Parallelize

        # Use automatic parallelization detection
        if Parallelize.needs_parallelization(self):
            # Forward threaded: subprocess parallelization and thread-pool
            # dispatch are complementary, so thread_safe elements still
            # dispatch onto the pool in the main process while parallel
            # elements run their work in workers.
            with Parallelize(self) as parallelize:
                parallelize.run(threaded=threaded)
            return

    # Run normally without parallelization
    self.check()
    self._startup()
    __start = time.perf_counter()
    with self._executor_for(threaded) as resolved:
        try:
            asyncio.get_running_loop()
            running_loop_exists = True
        except RuntimeError:
            running_loop_exists = False

        if not running_loop_exists:
            self.loop = asyncio.new_event_loop()
            try:
                self.loop.run_until_complete(self._execute_graphs(resolved))
            finally:
                self.loop.close()
        else:
            """If the event loop is running, e.g., running in a Jupyter
            Notebook, run the pipeline in a forked thread.
            """
            import threading

            # Capture any exception from the forked thread so it can be
            # re-raised on the calling thread; thread.join() otherwise
            # swallows it, silently completing failed runs.
            fork_exc: list[BaseException] = []

            def _run_in_fork(pipeline, exec_):
                pipeline.loop = asyncio.new_event_loop()
                asyncio.set_event_loop(pipeline.loop)
                try:
                    pipeline.loop.run_until_complete(
                        pipeline._execute_graphs(exec_)
                    )
                except BaseException as e:  # noqa: B036,BLE001
                    fork_exc.append(e)
                finally:
                    pipeline.loop.close()

            thread = threading.Thread(target=_run_in_fork, args=(self, resolved))
            thread.start()
            thread.join()
            if fork_exc:
                raise fork_exc[0]
    logger.info(
        "Pipeline().run() executed in %s seconds",
        (time.perf_counter() - __start),
    )

to_dot(label=None, expand_composed=True)

Convert the pipeline to a graph using graphviz.

Parameters:

Name Type Description Default
label str | None

str, label for the graph

None
expand_composed bool

bool, whether to expand composed elements into cluster subgraphs

True

Returns:

Type Description
str

str, the graph representation of the pipeline

Source code in src/sgn/apps.py
def to_dot(self, label: str | None = None, expand_composed: bool = True) -> str:
    """Convert the pipeline to a graph using graphviz.

    Args:
        label:
            str, label for the graph
        expand_composed:
            bool, whether to expand composed elements into cluster subgraphs

    Returns:
        str, the graph representation of the pipeline
    """
    return visualize(self, label=label, expand_composed=expand_composed).source

to_graph(label=None, expand_composed=True)

graphviz.DiGraph representation of pipeline

Parameters:

Name Type Description Default
label str | None

str, label for the graph

None
expand_composed bool

bool, whether to expand composed elements into cluster subgraphs

True

Returns:

Type Description

graphviz.Digraph: the graph object

Source code in src/sgn/apps.py
def to_graph(self, label: str | None = None, expand_composed: bool = True):
    """graphviz.DiGraph representation of pipeline

    Args:
        label:
            str, label for the graph
        expand_composed:
            bool, whether to expand composed elements into cluster subgraphs

    Returns:
        graphviz.Digraph: the graph object
    """
    return visualize(self, label=label, expand_composed=expand_composed)

visualize(path, label=None, expand_composed=True)

Convert the pipeline to a graph using graphviz, then render into a visual file.

Parameters:

Name Type Description Default
path str

str, the relative or full path to the file to write the graph to

required
label str | None

str, label for the graph

None
expand_composed bool

bool, whether to expand composed elements into cluster subgraphs

True
Source code in src/sgn/apps.py
def visualize(
    self,
    path: str,
    label: str | None = None,
    expand_composed: bool = True,
) -> None:
    """Convert the pipeline to a graph using graphviz, then render into a visual
    file.

    Args:
        path:
            str, the relative or full path to the file to write the graph to
        label:
            str, label for the graph
        expand_composed:
            bool, whether to expand composed elements into cluster subgraphs
    """
    visualize(self, label=label, path=Path(path), expand_composed=expand_composed)