Skip to content

sgn.sources

Source elements for generating data streams.

New classes need not be subclassed from IterSource or DequeSource, but should at least be ultimately a subclass of SourceElement.

DequeSource dataclass

Bases: IterSource


              flowchart TD
              sgn.sources.DequeSource[DequeSource]
              sgn.sources.IterSource[IterSource]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.sources.IterSource --> sgn.sources.DequeSource
                                sgn.base.SourceElement --> sgn.sources.IterSource
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                





              click sgn.sources.DequeSource href "" "sgn.sources.DequeSource"
              click sgn.sources.IterSource href "" "sgn.sources.IterSource"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A source element that has one double-ended-queue (deque) per source pad.

.. deprecated:: DequeSource is deprecated and will be removed in a future release. For new code prefer :class:IterSource; if dynamic appending is required, pass a deque (or any iterable) via iters and consume it with the iterator protocol. See the migration note below.

Ordering is LIFO, not FIFO. Values are produced via deque.pop() (right-most first), so iters={"H1": deque([1, 2, 3])} yields 3, 2, 1. To enqueue new data at runtime in producer-order use deques[name].appendleft(x) (or, equivalently, prefer :class:IterSource which consumes in insertion order).

Parameters:

Name Type Description Default
iters dict[str, Iterable[Any]] | None

dict[str, deque ], a mapping of source pads to deque s, where the key is the pad name and the value is the deque

None
eos_on_empty dict[str, bool] | bool

dict[str, bool] | bool, default True, a mapping of source pads to boolean values, where the key is the pad name and the value is the boolean. If a bool is given, the value is applied to all pads. If True, EOS is signaled when the deque is empty.

True
Source code in src/sgn/sources.py
@dataclass
class DequeSource(IterSource):
    """A source element that has one double-ended-queue (deque) per source pad.

    .. deprecated::
        ``DequeSource`` is deprecated and will be removed in a future release.
        For new code prefer :class:`IterSource`; if dynamic appending is
        required, pass a ``deque`` (or any iterable) via ``iters`` and consume
        it with the iterator protocol.  See the migration note below.

    **Ordering is LIFO, not FIFO.**  Values are produced via ``deque.pop()``
    (right-most first), so ``iters={"H1": deque([1, 2, 3])}`` yields
    ``3, 2, 1``.  To enqueue new data at runtime in producer-order use
    ``deques[name].appendleft(x)`` (or, equivalently, prefer
    :class:`IterSource` which consumes in insertion order).

    Args:
        iters:
            dict[str, deque ], a mapping of source pads to deque s, where the
            key is the pad name and the value is the deque
        eos_on_empty:
            dict[str, bool] | bool, default True, a mapping of source
            pads to boolean values, where the key is the pad name and the value
            is the boolean. If a bool is given, the value is applied to all
            pads. If True, EOS is signaled when the deque is empty.
    """

    def __post_init__(self):
        warnings.warn(
            "DequeSource is deprecated and will be removed in a future release; "
            "use IterSource instead. Note that DequeSource consumes values in "
            "LIFO (deque.pop) order, which IterSource does not.",
            DeprecationWarning,
            stacklevel=2,
        )
        super().__post_init__()

    def _coerce_iterator(self, iterable):
        """Coerce the iterable into a deque.

        Overrides :meth:`IterSource._coerce_iterator` to store a ``deque``
        (rather than a generic iterator), so that callers can append data at
        runtime via the :attr:`deques` property.

        Args:
            iterable:
                Iterable, the iterable to coerce

        Returns:
            deque, the deque
        """
        return deque(iterable)

    def _get_value(self, deque):
        """Get the next value from the deque.

        Uses ``deque.pop()`` (LIFO) for historical reasons; see the class
        docstring.

        Args:
            deque :
                deque , the deque to get the value from

        Returns:
            Any, the next value from the deque
        """
        try:
            return deque.pop()
        except IndexError:
            return None

    @property
    def deques(self) -> dict[str, Iterable]:
        """Get the iters property with more explicit alias."""
        assert isinstance(self.iters, dict)
        return self.iters

deques property

Get the iters property with more explicit alias.

IterSource dataclass

Bases: SourceElement


              flowchart TD
              sgn.sources.IterSource[IterSource]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.SourceElement --> sgn.sources.IterSource
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.sources.IterSource href "" "sgn.sources.IterSource"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A source element that has one iterable per source pad.

On each call to :meth:new one value is pulled from the per-pad iterator and wrapped in a frame. When an iterator is exhausted the next frame produced for that pad has data=None and is_gap=True; EOS is signaled on that same frame iff eos_on_empty is True for the pad (the default).

Parameters:

Name Type Description Default
iters dict[str, Iterable[Any]] | None

dict[str, Iterable[Any]], a mapping of source pads to iterables, where the key is the pad name and the value is the Iterable. These will be coerced to iterators, so they can be any iterable type.

None
eos_on_empty dict[str, bool] | bool

dict[str, bool] | bool, default True, a mapping of source pads to boolean values, where the key is the pad name and the value is the boolean. If a bool is given, the value is applied to all pads. If True, EOS is signaled when the iterator is empty.

True
Source code in src/sgn/sources.py
@dataclass
class IterSource(SourceElement):
    """A source element that has one iterable per source pad.

    On each call to :meth:`new` one value is pulled from the per-pad
    iterator and wrapped in a frame.  When an iterator is exhausted the
    next frame produced for that pad has ``data=None`` and ``is_gap=True``;
    EOS is signaled on that same frame iff ``eos_on_empty`` is True for
    the pad (the default).

    Args:
        iters:
            dict[str, Iterable[Any]], a mapping of source pads to iterables,
            where the key is the pad name and the value is the Iterable. These
            will be coerced to iterators, so they can be any iterable type.
        eos_on_empty:
            dict[str, bool] | bool, default True, a mapping of source
            pads to boolean values, where the key is the pad name and the value
            is the boolean. If a bool is given, the value is applied to all
            pads. If True, EOS is signaled when the iterator is empty.
    """

    iters: dict[str, Iterable[Any]] | None = None
    eos_on_empty: dict[str, bool] | bool = True
    frame_factory: Callable = Frame

    def __post_init__(self):
        """Post init checks for the IterSource element."""
        super().__post_init__()
        # Setup pad counts
        self._setup_iters()
        self._setup_eos_on_empty()
        self._validate_iters()
        self._validate_eos_on_empty()

    def _setup_iters(self):
        # Setup the iter_map if not given
        if self.iters is None:
            self.iters = {
                name: self._coerce_iterator([]) for name in self.source_pad_names
            }
        else:
            self.iters = {
                name: self._coerce_iterator(iterable)
                for name, iterable in self.iters.items()
            }

    def _setup_eos_on_empty(self):
        # Setup the limits if not given
        if isinstance(self.eos_on_empty, bool):
            self.eos_on_empty = {
                name: self.eos_on_empty for name in self.source_pad_names
            }

    def _validate_iters(self):
        # Check that iters has the correct number of entries
        if not len(self.iters) == len(self.source_pads):
            raise ValueError("The number of iters must match the number of pads")

        # Check that iters has the correct pad names
        for name in self.iters:
            if name not in self.source_pad_names:
                raise ValueError(
                    f"{type(self).__name__} has an iter for a pad that does "
                    f"not exist, got: {name}, options are: {self.source_pad_names}"
                )

    def _validate_eos_on_empty(self):
        # Check that eos_on_empty has the correct number of entries
        if not len(self.eos_on_empty) == len(self.source_pads):
            raise ValueError("The number of eos on empty must match the number of pads")

        # Check that eos_on_empty has the correct pad names
        for name in self.eos_on_empty:
            if name not in self.source_pad_names:
                raise ValueError(
                    f"{type(self).__name__} has an eos on empty for a pad that "
                    f"does not exist, got: {name}, options are: {self.source_pad_names}"
                )

    def _coerce_iterator(self, iterable):
        """Coerce the iterable to an iterator if it is not already one.

        Args:
            iterable:
                Iterable, the iterable to coerce

        Returns:
            Iterator, the iterator
        """
        # Check if already an iterator or generator
        if isinstance(iterable, (Iterator, Generator)):
            return iterable

        return iter(iterable)

    def _get_value(self, iterator):
        """Get the next value from the iterator.

        Args:
            iterator:
                Iterator, the iterator to get the value from

        Returns:
            Any, the next value from the iterator
        """
        try:
            return next(iterator)
        except StopIteration:
            return None

    def update(self, pad: SourcePad):
        """Update the iterator for the pad. This is a no-op for IterSource. For
        subclasses that need to update the iterator, this method should be overridden.
        Examples include reading from a file or network stream.

        Args:
            pad:
                SourcePad, the pad to update
        """
        pass

    def new(self, pad: SourcePad) -> Frame:
        """New Frames are created on "pad" with an instance specific count and a name
        derived from the pad name. EOS is set if we have surpassed the requested number
        of Frames.

        Args:
            pad:
                SourcePad, the pad for which to produce a new Frame

        Returns:
            Frame, the Frame with optional data payload
        """
        # Update the pad iterator
        self.update(pad=pad)

        # Get the pad iterator
        assert isinstance(self.iters, dict)
        assert isinstance(self.eos_on_empty, dict)
        pad_iter = self.iters[self.rsrcs[pad]]
        pad_eos_on_empty = self.eos_on_empty[self.rsrcs[pad]]

        # Get data from the iterator
        data = self._get_value(pad_iter)

        # Return the frame
        return self.frame_factory(
            EOS=data is None and pad_eos_on_empty,
            data=data,
            is_gap=data is None,
        )

__post_init__()

Post init checks for the IterSource element.

Source code in src/sgn/sources.py
def __post_init__(self):
    """Post init checks for the IterSource element."""
    super().__post_init__()
    # Setup pad counts
    self._setup_iters()
    self._setup_eos_on_empty()
    self._validate_iters()
    self._validate_eos_on_empty()

new(pad)

New Frames are created on "pad" with an instance specific count and a name derived from the pad name. EOS is set if we have surpassed the requested number of Frames.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new Frame

required

Returns:

Type Description
Frame

Frame, the Frame with optional data payload

Source code in src/sgn/sources.py
def new(self, pad: SourcePad) -> Frame:
    """New Frames are created on "pad" with an instance specific count and a name
    derived from the pad name. EOS is set if we have surpassed the requested number
    of Frames.

    Args:
        pad:
            SourcePad, the pad for which to produce a new Frame

    Returns:
        Frame, the Frame with optional data payload
    """
    # Update the pad iterator
    self.update(pad=pad)

    # Get the pad iterator
    assert isinstance(self.iters, dict)
    assert isinstance(self.eos_on_empty, dict)
    pad_iter = self.iters[self.rsrcs[pad]]
    pad_eos_on_empty = self.eos_on_empty[self.rsrcs[pad]]

    # Get data from the iterator
    data = self._get_value(pad_iter)

    # Return the frame
    return self.frame_factory(
        EOS=data is None and pad_eos_on_empty,
        data=data,
        is_gap=data is None,
    )

update(pad)

Update the iterator for the pad. This is a no-op for IterSource. For subclasses that need to update the iterator, this method should be overridden. Examples include reading from a file or network stream.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad to update

required
Source code in src/sgn/sources.py
def update(self, pad: SourcePad):
    """Update the iterator for the pad. This is a no-op for IterSource. For
    subclasses that need to update the iterator, this method should be overridden.
    Examples include reading from a file or network stream.

    Args:
        pad:
            SourcePad, the pad to update
    """
    pass

NullSource dataclass

Bases: SourceElement, SignalEOS


              flowchart TD
              sgn.sources.NullSource[NullSource]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.base.SourceElement --> sgn.sources.NullSource
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                


                sgn.sources.SignalEOS --> sgn.sources.NullSource
                


              click sgn.sources.NullSource href "" "sgn.sources.NullSource"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A source that does precisely nothing.

It is useful for testing and debugging, and will always produce empty frames

frame_factory: Callable = Frame
wait: float = None
num_frames: int = None

If wait is not None the source will block for wait seconds before each new buffer, which is useful for slowing down debugging pipelines. By default this source element handles SIGINT and uses that to set EOS. See SignalEOS. In order to use this feature, the pipeline must be run within the SignalEOS context manager, e.g.,

with SignalEOS() as signal_eos:
    p.run()
Source code in src/sgn/sources.py
@dataclass
class NullSource(SourceElement, SignalEOS):
    """A source that does precisely nothing.

    It is useful for testing and debugging, and will always produce empty frames

        frame_factory: Callable = Frame
        wait: float = None
        num_frames: int = None

    If wait is not None the source will block for wait seconds before each new
    buffer, which is useful for slowing down debugging pipelines.  By default
    this source element handles SIGINT and uses that to set EOS. See SignalEOS.
    In order to use this feature, the pipeline must be run within the SignalEOS
    context manager, e.g.,

        with SignalEOS() as signal_eos:
            p.run()
    """

    frame_factory: Callable = Frame
    wait: float | None = None
    num_frames: int | None = None

    def __post_init__(self):
        super().__post_init__()
        self.frame_count = 0

    def new(self, pad: SourcePad) -> Frame:
        """New Frames are created on "pad" with an instance specific count and a name
        derived from the pad name. EOS is set if we have surpassed the requested number
        of Frames.

        Args:
            pad:
                SourcePad, the pad for which to produce a new Frame

        Returns:
            Frame, the Frame with optional data payload
        """
        if self.wait is not None:
            sleep(self.wait)
        self.frame_count += 1
        return self.frame_factory(
            EOS=self.signaled_eos()
            or (self.num_frames is not None and self.frame_count > self.num_frames),
            data=None,
        )

new(pad)

New Frames are created on "pad" with an instance specific count and a name derived from the pad name. EOS is set if we have surpassed the requested number of Frames.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new Frame

required

Returns:

Type Description
Frame

Frame, the Frame with optional data payload

Source code in src/sgn/sources.py
def new(self, pad: SourcePad) -> Frame:
    """New Frames are created on "pad" with an instance specific count and a name
    derived from the pad name. EOS is set if we have surpassed the requested number
    of Frames.

    Args:
        pad:
            SourcePad, the pad for which to produce a new Frame

    Returns:
        Frame, the Frame with optional data payload
    """
    if self.wait is not None:
        sleep(self.wait)
    self.frame_count += 1
    return self.frame_factory(
        EOS=self.signaled_eos()
        or (self.num_frames is not None and self.frame_count > self.num_frames),
        data=None,
    )

SignalEOS

This class provides global signal handling for an SGN pipeline. If you inherit it for a source element then it will capture SIGINT and provide a method to mark that eos should be flagged. See NullSource as an example.

Additionally this must be used as a context manager for executing a pipeline and disabling the signal hander after the pipeline is done, e.g.,

with SignalEOS() as signal_eos:
    p.run()

Note: signal.signal only delivers signals on the main thread of the main interpreter. If a source's new runs from a worker thread the handler installed here will not fire there.

Source code in src/sgn/sources.py
class SignalEOS:
    """
    This class provides global signal handling for an SGN pipeline.  If you
    inherit it for a source element then it will capture SIGINT and provide a
    method to mark that eos should be flagged.  See NullSource as an example.

    Additionally this must be used as a context manager for executing
    a pipeline and disabling the signal hander after the pipeline is done, e.g.,

        with SignalEOS() as signal_eos:
            p.run()

    Note: ``signal.signal`` only delivers signals on the main thread of the
    main interpreter.  If a source's ``new`` runs from a worker thread the
    handler installed here will not fire there.
    """

    handled_signals = {signal.SIGINT, signal.SIGTERM}
    rcvd_signals: set[int] = set([])
    previous_handlers: dict[int, Callable] = {}

    @staticmethod
    def _handler(signum, frame):
        SignalEOS.rcvd_signals.add(signum)

    @classmethod
    def signaled_eos(cls):
        """Indicate whether a signal has been received to indicate an EOS.

        Returns true of the intersection of received signals and handled
        signals is nonzero.  This can be used by developers to decide if EOS
        should be set.
        """
        return bool(cls.rcvd_signals & cls.handled_signals)

    def raise_signal(self, sig):
        """Raise a signal that has already been raised previously.

        Intended to be used if the application needs to re-raise one of the
        signals with the previous signal handler.  NOTE - this will only raise
        the signal if it had been previously raised and only within a given
        context.
        """
        if sig in SignalEOS.rcvd_signals:
            signal.raise_signal(sig)

    def __enter__(self):
        """Store the previous signal handlers and setup new ones for the
        handled signals"""
        for sig in SignalEOS.handled_signals:
            SignalEOS.previous_handlers[sig] = signal.getsignal(sig)
            signal.signal(sig, SignalEOS._handler)
        return self

    def __exit__(self, exc_type, exc_val, exc_tb):
        """Restore the original signal handlers"""
        for sig in SignalEOS.handled_signals:
            signal.signal(sig, SignalEOS.previous_handlers[sig])
        SignalEOS.rcvd_signals = set([])
        testpoint.stop()

__enter__()

Store the previous signal handlers and setup new ones for the handled signals

Source code in src/sgn/sources.py
def __enter__(self):
    """Store the previous signal handlers and setup new ones for the
    handled signals"""
    for sig in SignalEOS.handled_signals:
        SignalEOS.previous_handlers[sig] = signal.getsignal(sig)
        signal.signal(sig, SignalEOS._handler)
    return self

__exit__(exc_type, exc_val, exc_tb)

Restore the original signal handlers

Source code in src/sgn/sources.py
def __exit__(self, exc_type, exc_val, exc_tb):
    """Restore the original signal handlers"""
    for sig in SignalEOS.handled_signals:
        signal.signal(sig, SignalEOS.previous_handlers[sig])
    SignalEOS.rcvd_signals = set([])
    testpoint.stop()

raise_signal(sig)

Raise a signal that has already been raised previously.

Intended to be used if the application needs to re-raise one of the signals with the previous signal handler. NOTE - this will only raise the signal if it had been previously raised and only within a given context.

Source code in src/sgn/sources.py
def raise_signal(self, sig):
    """Raise a signal that has already been raised previously.

    Intended to be used if the application needs to re-raise one of the
    signals with the previous signal handler.  NOTE - this will only raise
    the signal if it had been previously raised and only within a given
    context.
    """
    if sig in SignalEOS.rcvd_signals:
        signal.raise_signal(sig)

signaled_eos() classmethod

Indicate whether a signal has been received to indicate an EOS.

Returns true of the intersection of received signals and handled signals is nonzero. This can be used by developers to decide if EOS should be set.

Source code in src/sgn/sources.py
@classmethod
def signaled_eos(cls):
    """Indicate whether a signal has been received to indicate an EOS.

    Returns true of the intersection of received signals and handled
    signals is nonzero.  This can be used by developers to decide if EOS
    should be set.
    """
    return bool(cls.rcvd_signals & cls.handled_signals)

StatsSource dataclass

Bases: SourceElement


              flowchart TD
              sgn.sources.StatsSource[StatsSource]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]

                              sgn.base.SourceElement --> sgn.sources.StatsSource
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                




              click sgn.sources.StatsSource href "" "sgn.sources.StatsSource"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
            

A source element that produces system statistics.

This source collects system statistics using psutil and produces frames containing system performance data for the current SGN pipeline and system.

Frame data semantics:

  • When stats are collected, data is a dict with timestamp, and (optionally) process / system keys.
  • When interval is set and the window has not yet elapsed, the frame carries an empty dict (data == {}) and is marked is_gap=True so downstream consumers can distinguish "no sample this tick" from a real measurement.

Signal-driven EOS: when eos_on_signal=True (the default) this source consults :class:SignalEOS to decide whether to flag EOS, but it does not install signal handlers on its own. The pipeline must be run inside the :class:SignalEOS context manager (with SignalEOS(): ...) or EOS-on-signal will silently never fire.

Parameters:

Name Type Description Default
interval float | None

float | None, time in seconds between stats collection. If None, stats are collected every time new() is called.

None
include_process_stats bool

bool, whether to include statistics about the current process.

True
include_system_stats bool

bool, whether to include system-wide statistics.

True
frame_factory Callable

Callable, the factory function to create frames.

Frame
eos_on_signal bool

bool, whether to end the stream on receiving a signal (SIGINT/SIGTERM).

True
wait float | None

float | None, time in seconds to wait between frames. If None, frames are produced as fast as possible.

None
Source code in src/sgn/sources.py
@dataclass
class StatsSource(SourceElement):
    """A source element that produces system statistics.

    This source collects system statistics using psutil and produces frames
    containing system performance data for the current SGN pipeline and system.

    Frame data semantics:

    * When stats are collected, ``data`` is a dict with ``timestamp``, and
      (optionally) ``process`` / ``system`` keys.
    * When ``interval`` is set and the window has not yet elapsed, the frame
      carries an empty dict (``data == {}``) and is marked ``is_gap=True``
      so downstream consumers can distinguish "no sample this tick" from a
      real measurement.

    Signal-driven EOS: when ``eos_on_signal=True`` (the default) this source
    consults :class:`SignalEOS` to decide whether to flag EOS, but it does
    *not* install signal handlers on its own.  The pipeline must be run
    inside the :class:`SignalEOS` context manager (``with SignalEOS(): ...``)
    or EOS-on-signal will silently never fire.

    Args:
        interval:
            float | None, time in seconds between stats collection.
            If None, stats are collected every time new() is called.
        include_process_stats:
            bool, whether to include statistics about the current process.
        include_system_stats:
            bool, whether to include system-wide statistics.
        frame_factory:
            Callable, the factory function to create frames.
        eos_on_signal:
            bool, whether to end the stream on receiving a signal (SIGINT/SIGTERM).
        wait:
            float | None, time in seconds to wait between frames.
            If None, frames are produced as fast as possible.
    """

    interval: float | None = None
    include_process_stats: bool = True
    include_system_stats: bool = True
    frame_factory: Callable = Frame
    eos_on_signal: bool = True
    wait: float | None = None

    def __post_init__(self):
        """Post initialization setup for StatsSource."""
        super().__post_init__()
        # Sentinel that ensures the first call to
        # _maybe_advance_collection_window always collects, regardless of
        # interval.  -inf works because any monotonic delta exceeds interval.
        self._last_collection_time = float("-inf")
        self._eos = False

        # Set up process tracking
        self._current_pid = os.getpid()
        self._current_process = None

        if PSUTIL_AVAILABLE:
            self._current_process = psutil.Process(self._current_pid)
        else:
            warnings.warn(
                "psutil is not installed. StatsSource will provide minimal "
                "functionality. Install with: pip install psutil",
                stacklevel=2,
            )

        # Set up signal handling if requested
        self._signal_handler = None
        if self.eos_on_signal:
            self._signal_handler = SignalEOS

    def _collect_process_stats(self) -> dict[str, Any]:
        """Collect statistics for the current process.

        Returns:
            Dict containing process statistics.
        """
        if not PSUTIL_AVAILABLE or self._current_process is None:
            return {
                "pid": self._current_pid,
                "error": "psutil not available",
                "limited_info": True,
            }

        proc = self._current_process

        # Basic process info
        proc_info = {
            "pid": proc.pid,
            "name": proc.name(),
            "status": proc.status(),
            "created": proc.create_time(),
            "running_time": time.time() - proc.create_time(),
        }

        # CPU stats
        try:
            proc_info["cpu_percent"] = proc.cpu_percent(interval=None)
            proc_info["cpu_times"] = dict(proc.cpu_times()._asdict())
            proc_info["num_threads"] = proc.num_threads()
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            pass

        # Memory stats
        try:
            mem_info = proc.memory_info()
            proc_info["memory"] = {
                "rss": mem_info.rss,  # Resident Set Size
                "vms": mem_info.vms,  # Virtual Memory Size
                "shared": getattr(mem_info, "shared", 0),
                "text": getattr(mem_info, "text", 0),
                "data": getattr(mem_info, "data", 0),
            }
            proc_info["memory_percent"] = proc.memory_percent()
        except (psutil.NoSuchProcess, psutil.AccessDenied):
            pass

        # IO stats
        try:
            io_counters = proc.io_counters()
            proc_info["io"] = {
                "read_count": io_counters.read_count,
                "write_count": io_counters.write_count,
                "read_bytes": io_counters.read_bytes,
                "write_bytes": io_counters.write_bytes,
            }
        except (psutil.NoSuchProcess, psutil.AccessDenied, AttributeError):
            pass

        return proc_info

    def _collect_system_stats(self) -> dict[str, Any]:
        """Collect system-wide statistics.

        Returns:
            Dict containing system statistics.
        """
        if not PSUTIL_AVAILABLE:
            return {
                "error": "psutil not available",
                "limited_info": True,
                "system": sys.platform,
                "python_version": sys.version,
            }

        system_stats = {}

        # CPU stats
        system_stats["cpu"] = {
            "percent": psutil.cpu_percent(interval=None),
            "count": {
                "physical": psutil.cpu_count(logical=False),
                "logical": psutil.cpu_count(logical=True),
            },
            "stats": dict(psutil.cpu_stats()._asdict()),
        }

        try:
            system_stats["cpu"]["times"] = dict(psutil.cpu_times()._asdict())
            system_stats["cpu"]["freq"] = (
                dict(psutil.cpu_freq()._asdict()) if psutil.cpu_freq() else {}
            )
        except (AttributeError, OSError):
            pass

        # Memory stats
        system_stats["memory"] = dict(psutil.virtual_memory()._asdict())
        system_stats["swap"] = dict(psutil.swap_memory()._asdict())

        # Disk stats
        try:
            system_stats["disk"] = {
                "usage": dict(psutil.disk_usage("/")._asdict()),
                "io_counters": (
                    dict(psutil.disk_io_counters()._asdict())
                    if psutil.disk_io_counters()
                    else {}
                ),
            }
        except (AttributeError, OSError):
            pass

        # Network stats
        try:
            net_io = psutil.net_io_counters()
            system_stats["network"] = dict(net_io._asdict()) if net_io else {}
        except (AttributeError, OSError):
            pass

        return system_stats

    def _maybe_advance_collection_window(self) -> bool:
        """Check if the collection window has elapsed and, if so, advance it.

        Returns True iff a sample should be taken now; the call has a side
        effect (advancing ``_last_collection_time``) so two calls in a row
        give different answers.  Uses :func:`time.monotonic` so NTP/clock
        adjustments cannot stall or double-fire collection.

        Returns:
            bool, True if it's time to collect stats based on the interval.
        """
        current_time = time.monotonic()
        if (
            self.interval is None
            or (current_time - self._last_collection_time) >= self.interval
        ):
            self._last_collection_time = current_time
            return True
        return False

    def should_collect_stats(self) -> bool:
        """Back-compat alias for :meth:`_maybe_advance_collection_window`.

        The name reads as a pure predicate but the call mutates
        ``_last_collection_time``; the underscore-prefixed method is the
        preferred spelling in new code.
        """
        return self._maybe_advance_collection_window()

    def check_eos(self) -> bool:
        """Check if end-of-stream has been signaled.

        Returns:
            bool, True if EOS should be set.
        """
        if self._eos:
            return True

        if (
            self.eos_on_signal
            and self._signal_handler
            and self._signal_handler.signaled_eos()
        ):
            return True

        return False

    def new(self, pad: SourcePad) -> Frame:
        """Create a new Frame containing system statistics.

        This method is called by the pipeline to produce a new frame with
        current system statistics.

        Args:
            pad: SourcePad, the pad for which to produce a new Frame

        Returns:
            Frame, the Frame containing system statistics
        """
        # Respect the wait parameter if set, adding a delay between frames
        if self.wait is not None:
            time.sleep(self.wait)

        stats: dict[str, Any] = {}

        # Check if we should collect stats based on the interval
        collected = self._maybe_advance_collection_window()
        if collected:
            # Collect process stats if requested
            if self.include_process_stats:
                stats["process"] = self._collect_process_stats()

            # Collect system stats if requested
            if self.include_system_stats:
                stats["system"] = self._collect_system_stats()

            # Add timestamp (wall-clock, intentionally distinct from the
            # monotonic clock used for interval bookkeeping)
            stats["timestamp"] = float(time.time())

        # Check for EOS condition
        eos = self.check_eos()

        # Create and return the frame. When the interval has not elapsed,
        # the frame is a gap (empty dict, is_gap=True) so consumers can
        # distinguish "not yet time to sample" from a real measurement.
        return self.frame_factory(
            EOS=eos,
            data=stats,
            is_gap=not collected,
            metadata={"stats_type": "system_metrics"},
        )

__post_init__()

Post initialization setup for StatsSource.

Source code in src/sgn/sources.py
def __post_init__(self):
    """Post initialization setup for StatsSource."""
    super().__post_init__()
    # Sentinel that ensures the first call to
    # _maybe_advance_collection_window always collects, regardless of
    # interval.  -inf works because any monotonic delta exceeds interval.
    self._last_collection_time = float("-inf")
    self._eos = False

    # Set up process tracking
    self._current_pid = os.getpid()
    self._current_process = None

    if PSUTIL_AVAILABLE:
        self._current_process = psutil.Process(self._current_pid)
    else:
        warnings.warn(
            "psutil is not installed. StatsSource will provide minimal "
            "functionality. Install with: pip install psutil",
            stacklevel=2,
        )

    # Set up signal handling if requested
    self._signal_handler = None
    if self.eos_on_signal:
        self._signal_handler = SignalEOS

check_eos()

Check if end-of-stream has been signaled.

Returns:

Type Description
bool

bool, True if EOS should be set.

Source code in src/sgn/sources.py
def check_eos(self) -> bool:
    """Check if end-of-stream has been signaled.

    Returns:
        bool, True if EOS should be set.
    """
    if self._eos:
        return True

    if (
        self.eos_on_signal
        and self._signal_handler
        and self._signal_handler.signaled_eos()
    ):
        return True

    return False

new(pad)

Create a new Frame containing system statistics.

This method is called by the pipeline to produce a new frame with current system statistics.

Parameters:

Name Type Description Default
pad SourcePad

SourcePad, the pad for which to produce a new Frame

required

Returns:

Type Description
Frame

Frame, the Frame containing system statistics

Source code in src/sgn/sources.py
def new(self, pad: SourcePad) -> Frame:
    """Create a new Frame containing system statistics.

    This method is called by the pipeline to produce a new frame with
    current system statistics.

    Args:
        pad: SourcePad, the pad for which to produce a new Frame

    Returns:
        Frame, the Frame containing system statistics
    """
    # Respect the wait parameter if set, adding a delay between frames
    if self.wait is not None:
        time.sleep(self.wait)

    stats: dict[str, Any] = {}

    # Check if we should collect stats based on the interval
    collected = self._maybe_advance_collection_window()
    if collected:
        # Collect process stats if requested
        if self.include_process_stats:
            stats["process"] = self._collect_process_stats()

        # Collect system stats if requested
        if self.include_system_stats:
            stats["system"] = self._collect_system_stats()

        # Add timestamp (wall-clock, intentionally distinct from the
        # monotonic clock used for interval bookkeeping)
        stats["timestamp"] = float(time.time())

    # Check for EOS condition
    eos = self.check_eos()

    # Create and return the frame. When the interval has not elapsed,
    # the frame is a gap (empty dict, is_gap=True) so consumers can
    # distinguish "not yet time to sample" from a real measurement.
    return self.frame_factory(
        EOS=eos,
        data=stats,
        is_gap=not collected,
        metadata={"stats_type": "system_metrics"},
    )

should_collect_stats()

Back-compat alias for :meth:_maybe_advance_collection_window.

The name reads as a pure predicate but the call mutates _last_collection_time; the underscore-prefixed method is the preferred spelling in new code.

Source code in src/sgn/sources.py
def should_collect_stats(self) -> bool:
    """Back-compat alias for :meth:`_maybe_advance_collection_window`.

    The name reads as a pure predicate but the call mutates
    ``_last_collection_time``; the underscore-prefixed method is the
    preferred spelling in new code.
    """
    return self._maybe_advance_collection_window()