Skip to content

sgn.subprocess

Parallelize

Bases: SignalEOS


              flowchart TD
              sgn.subprocess.Parallelize[Parallelize]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                


              click sgn.subprocess.Parallelize href "" "sgn.subprocess.Parallelize"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A context manager for running SGN pipelines with elements that implement separate processes or threads.

This class manages the lifecycle of workers (processes or threads) in an SGN pipeline, handling worker creation, execution, and cleanup. It also supports shared memory objects that will be automatically cleaned up on exit through the to_shm() method (only applicable for process mode).

Key features include: - Automatic management of worker lifecycle (creation, starting, joining, cleanup) - Shared memory management for efficient data sharing (process mode only) - Signal handling coordination between main process/thread and workers - Resilience against KeyboardInterrupt (Ctrl+C) - workers catch and ignore these signals, allowing the main process to coordinate a clean shutdown - Orderly shutdown to ensure all resources are properly released - Support for both multiprocessing and threading concurrency models - Automatic detection and invocation when pipeline.run() is called

IMPORTANT: When using process mode, code using Parallelize MUST be wrapped within an if name == "main": block. This is required because SGN uses Python's multiprocessing module with the 'spawn' start method, which requires that the main module be importable.

Supported usage — sequential pipelines only. The design supports a single active Parallelize instance at a time, and multiple pipelines must be run one after another (with p1: ... finishes before with p2: ... begins). ParallelizeBase subclasses register themselves on a class-level "pending" list at construction time; the next Parallelize(...) instance claims that pending list and owns it for the rest of its lifecycle. To keep each pipeline's elements isolated on its own Parallelize, construct every element (and every to_shm() segment) belonging to a pipeline before constructing its Parallelize, and only then move on to constructing the next pipeline.

Unsupported usage:

  • Concurrent multi-pipeline (two threads each driving a Parallelize) is not supported. The class-level pending list is shared mutable state, so two threads constructing elements and Parallelize instances interleaved will claim each other's elements. No locking is performed.
  • Nested context managers (with p1: with p2: ...) is not supported. Although Parallelize's own per-instance state (the claimed instance_list and shm_list) survives nesting cleanly, Parallelize inherits from :class:~sgn.sources.SignalEOS, whose __enter__/__exit__ use a single class-level previous_handlers dict rather than a stack. Nesting causes the inner __enter__ to overwrite the outer's saved signal handlers, so the original handlers are leaked away on outer __exit__. rcvd_signals is also wiped on each __exit__, so a signal received during the outer context can be dropped by the inner exit.

Shared memory cleanup: segments registered via to_shm() are normally unlinked when Parallelize.__exit__ runs. As a safety net, an :mod:atexit hook calls :meth:cleanup_all_shm on interpreter shutdown to release any segments still tracked, and you can call :meth:cleanup_all_shm directly from an error-handling path or interactively to recover from a leak.

Example with automatic parallelization (RECOMMENDED): def main(): pipeline = Pipeline() # Add ParallelizeTransformElement, ParallelizeSinkElement, etc. pipeline.run() # Automatically detects and enables parallelization

if __name__ == "__main__":
    main()

Example with manual context manager (LEGACY): def main(): pipeline = Pipeline() with Parallelize(pipeline) as parallelize: parallelize.run()

if __name__ == "__main__":
    main()
Source code in src/sgn/subprocess.py
class Parallelize(SignalEOS):
    """
    A context manager for running SGN pipelines with elements that implement
    separate processes or threads.

    This class manages the lifecycle of workers (processes or threads) in an SGN
    pipeline, handling worker creation, execution, and cleanup. It also supports
    shared memory objects that will be automatically cleaned up on exit through the
    to_shm() method (only applicable for process mode).

    Key features include:
    - Automatic management of worker lifecycle (creation, starting, joining, cleanup)
    - Shared memory management for efficient data sharing (process mode only)
    - Signal handling coordination between main process/thread and workers
    - Resilience against KeyboardInterrupt (Ctrl+C) - workers catch and ignore these
      signals, allowing the main process to coordinate a clean shutdown
    - Orderly shutdown to ensure all resources are properly released
    - Support for both multiprocessing and threading concurrency models
    - Automatic detection and invocation when pipeline.run() is called

    IMPORTANT: When using process mode, code using Parallelize MUST be
    wrapped within an if __name__ == "__main__": block. This is required because SGN
    uses Python's multiprocessing module with the 'spawn' start method, which requires
    that the main module be importable.

    Supported usage — sequential pipelines only. The design supports a single
    active ``Parallelize`` instance at a time, and multiple pipelines must be
    run one after another (``with p1: ...`` finishes before ``with p2: ...``
    begins). ``ParallelizeBase`` subclasses register themselves on a
    class-level "pending" list at construction time; the next
    ``Parallelize(...)`` instance claims that pending list and owns it for the
    rest of its lifecycle. To keep each pipeline's elements isolated on its
    own ``Parallelize``, construct every element (and every ``to_shm()``
    segment) belonging to a pipeline *before* constructing its
    ``Parallelize``, and only then move on to constructing the next pipeline.

    Unsupported usage:

    - **Concurrent multi-pipeline (two threads each driving a ``Parallelize``)**
      is not supported. The class-level pending list is shared mutable state,
      so two threads constructing elements and ``Parallelize`` instances
      interleaved will claim each other's elements. No locking is performed.
    - **Nested context managers (``with p1: with p2: ...``)** is not
      supported. Although ``Parallelize``'s own per-instance state (the
      claimed ``instance_list`` and ``shm_list``) survives nesting cleanly,
      ``Parallelize`` inherits from :class:`~sgn.sources.SignalEOS`, whose
      ``__enter__``/``__exit__`` use a single class-level ``previous_handlers``
      dict rather than a stack. Nesting causes the inner ``__enter__`` to
      overwrite the outer's saved signal handlers, so the original handlers
      are leaked away on outer ``__exit__``. ``rcvd_signals`` is also wiped
      on each ``__exit__``, so a signal received during the outer context
      can be dropped by the inner exit.

    Shared memory cleanup: segments registered via ``to_shm()`` are normally
    unlinked when ``Parallelize.__exit__`` runs. As a safety net, an :mod:`atexit`
    hook calls :meth:`cleanup_all_shm` on interpreter shutdown to release any
    segments still tracked, and you can call :meth:`cleanup_all_shm` directly
    from an error-handling path or interactively to recover from a leak.

    Example with automatic parallelization (RECOMMENDED):
        def main():
            pipeline = Pipeline()
            # Add ParallelizeTransformElement, ParallelizeSinkElement, etc.
            pipeline.run()  # Automatically detects and enables parallelization

        if __name__ == "__main__":
            main()

    Example with manual context manager (LEGACY):
        def main():
            pipeline = Pipeline()
            with Parallelize(pipeline) as parallelize:
                parallelize.run()

        if __name__ == "__main__":
            main()
    """

    # Class-level "pending" registries. Elements register here at construction
    # time; the next Parallelize(...) claims them onto its own instance.
    shm_list: list = []
    instance_list: list = []
    # WeakSet of live Parallelize instances. Used by cleanup_all_shm() (and the
    # atexit hook) to reach shm segments that have already been claimed onto an
    # instance but whose context manager has not run __exit__ yet.
    _live_instances: weakref.WeakSet = weakref.WeakSet()
    enabled: bool = False
    # The hard timeout before a worker gets terminated.
    # Workers should cleanup after themselves within this time and exit cleanly.
    # This is a "global" property applied to all subprocesses / subthreads
    join_timeout: float = 10.0  # Increased for CI environments
    # Default flag for whether to use threading (False means use multiprocessing)
    use_threading_default: bool = False
    # Instance variable for thread mode
    use_threading: bool = False

    def __init__(self, pipeline=None, use_threading: bool | None = None):
        """
        Initialize the Parallelize context manager.

        Args:
            pipeline (Pipeline | None): The pipeline to run.
            use_threading (bool | None): Whether to use threading instead of
                multiprocessing. If not specified, uses the use_threading_default.
        """
        self.pipeline = pipeline
        # Use the specified mode, or fall back to the class default
        self.use_threading = (
            use_threading
            if use_threading is not None
            else Parallelize.use_threading_default
        )
        # Claim pending element and shared-memory registrations onto this
        # instance, so that two Parallelize instances do not share state.
        self.instance_list = Parallelize.instance_list
        self.shm_list = Parallelize.shm_list
        Parallelize.instance_list = []
        Parallelize.shm_list = []
        # Track ourselves so cleanup_all_shm() can reach claimed segments
        # even if __exit__ never runs (e.g. user forgot the with-block).
        Parallelize._live_instances.add(self)

    def __enter__(self):
        try:
            multiprocessing.set_start_method("spawn")
        except RuntimeError:
            pass
        super().__enter__()
        for e in self.instance_list:
            e.worker.start()
        Parallelize.enabled = True
        return self

    def __exit__(self, exc_type, exc_value, exc_traceback):
        super().__exit__(exc_type, exc_value, exc_traceback)
        # rejoin all the workers
        self._join_workers(self.instance_list)
        self.instance_list = []

        # Clean up shared memory. We attempt unconditionally rather than
        # gating on self.use_threading: per-element _use_threading_override
        # may have caused a process-mode element to allocate shared memory
        # even when the outer Parallelize uses threading.
        for d in self.shm_list:
            with contextlib.suppress(FileNotFoundError):
                multiprocessing.shared_memory.SharedMemory(name=d["name"]).unlink()
        self.shm_list = []

        Parallelize.enabled = False

    @staticmethod
    def _join_workers(instances):
        """Cancel queue join threads, then join (and if necessary kill) workers."""
        for p in instances:
            if p.in_queue is not None:
                p.in_queue.cancel_join_thread()
            if p.out_queue is not None:
                p.out_queue.cancel_join_thread()

            if (
                p.worker is not None
                and hasattr(p.worker, "is_alive")
                and p.worker.is_alive()
            ):
                p.worker.join(Parallelize.join_timeout)
                # Only processes can be killed, threads will naturally terminate
                if hasattr(p.worker, "kill") and p.worker.is_alive():
                    p.worker.kill()

    @staticmethod
    def to_shm(name, bytez, **kwargs):
        """
        Create a shared memory object that can be accessed by subprocesses.

        Note: This is only applicable in process mode. In thread mode, shared memory
        is not necessary since threads share the same address space.

        This method creates a shared memory segment that will be automatically
        cleaned up when the Parallelize context manager exits. The shared memory can be
        used to efficiently share large data between processes without serialization
        overhead.

        Args:
            name (str): Unique identifier for the shared memory block.
            bytez (bytes | bytearray): Data to store in shared memory.
            **kwargs (Any): Additional metadata to store with the shared memory
                reference.

        Returns:
            dict[str, Any]: A dictionary containing the shared memory object
                and metadata with keys:
                - "name": The name of the shared memory block
                - "shm": The SharedMemory object
                - Any additional key-value pairs from kwargs

        Raises:
            FileExistsError: If shared memory with the given name already exists

        Example:
            shared_data = bytearray("Hello world", "utf-8")
            shm_ref = Parallelize.to_shm("example_data", shared_data)
        """
        try:
            shm = multiprocessing.shared_memory.SharedMemory(
                name=name, create=True, size=len(bytez)
            )
        except FileExistsError:
            logger.exception(
                "Shared memory %r already exists. Unlink it manually with "
                "multiprocessing.shared_memory.SharedMemory(name=%r).unlink(), "
                "or call Parallelize.cleanup_all_shm() to drop every segment "
                "currently tracked by Parallelize.",
                name,
                name,
            )
            # Intentionally do NOT bulk-unlink other segments in shm_list:
            # doing so corrupts shared memory belonging to other pending or
            # active Parallelize contexts.
            raise

        shm.buf[: len(bytez)] = bytez
        out = {"name": name, "shm": shm, **kwargs}
        Parallelize.shm_list.append(out)
        return out

    @classmethod
    def cleanup_all_shm(cls):
        """Best-effort unlink of every shared memory segment tracked by Parallelize.

        Walks two registries:

        - ``Parallelize.shm_list`` — pending segments registered by ``to_shm``
          but not yet claimed by a ``Parallelize`` instance.
        - ``Parallelize._live_instances`` — segments already claimed onto a
          live ``Parallelize`` instance whose ``__exit__`` has not run.

        Every segment found is unlinked; ``FileNotFoundError`` (segment already
        gone) is suppressed. The registries are cleared so a subsequent call
        is a no-op.

        Call this on an error path that bypasses the normal context-manager
        cleanup, or interactively to recover from a leak. Also registered as
        an :mod:`atexit` hook to catch the "user forgot ``with``" case.
        """
        # Pending segments (not yet claimed by any Parallelize instance).
        pending, cls.shm_list = cls.shm_list, []
        # Claimed segments held by any live Parallelize instance.
        claimed: list[dict] = []
        for inst in list(cls._live_instances):
            inst_shm = getattr(inst, "shm_list", None)
            if not inst_shm:
                continue
            claimed.extend(inst_shm)
            inst.shm_list = []

        for d in pending + claimed:
            with contextlib.suppress(FileNotFoundError, KeyError):
                multiprocessing.shared_memory.SharedMemory(name=d["name"]).unlink()

    def run(self, threaded=None):
        """
        Run the pipeline managed by this Parallelize instance.

        This method executes the associated pipeline and ensures proper cleanup
        of worker resources, even in the case of exceptions. It signals all
        workers to stop when the pipeline execution completes or if an exception
        occurs.

        Args:
            threaded (int | Executor | None): Forwarded to ``Pipeline.run``.
                Subprocess parallelization (this context) and thread-pool
                dispatch are complementary: workers run their heavy work in
                separate processes while ``thread_safe`` elements still
                dispatch their callbacks onto the thread pool in the main
                process. See ``Pipeline.run`` for the accepted values.

        Raises:
            RuntimeError: If an exception occurs during pipeline execution
            AssertionError: If no pipeline was provided to the Parallelize
                constructor.
        """
        assert (
            self.pipeline is not None
        ), "Pipeline must be provided to Parallelize constructor before running"
        try:
            # Disable auto-parallelization since we're already in a Parallelize
            # context, but forward threaded so thread-pool dispatch of
            # thread_safe elements still applies in the main process.
            self.pipeline.run(auto_parallelize=False, threaded=threaded)
        except KeyboardInterrupt:
            logger.warning("Interrupt received, stopping pipeline...")
            self._stop_and_join_workers()
            return
        except Exception:
            self._stop_and_join_workers()
            raise

        # Signal all workers to stop when pipeline completes normally
        for p in self.instance_list:
            p.worker_stop.set()

    def _stop_and_join_workers(self):
        """Signal all workers to stop and join/kill them."""
        for p in self.instance_list:
            p.worker_stop.set()
        self._join_workers(self.instance_list)

    @staticmethod
    def needs_parallelization(pipeline):
        """
        Check if a pipeline contains any elements that require parallelization.

        Args:
            pipeline (Pipeline): The Pipeline instance to check.

        Returns:
            bool: True if the pipeline contains any Parallelize* elements.
        """
        # Check if any element is a subclass of ParallelizeBase
        return any(
            isinstance(element, ParallelizeBase) for element in pipeline.elements
        )

__init__(pipeline=None, use_threading=None)

Initialize the Parallelize context manager.

Parameters:

Name Type Description Default
pipeline Pipeline | None

The pipeline to run.

None
use_threading bool | None

Whether to use threading instead of multiprocessing. If not specified, uses the use_threading_default.

None
Source code in src/sgn/subprocess.py
def __init__(self, pipeline=None, use_threading: bool | None = None):
    """
    Initialize the Parallelize context manager.

    Args:
        pipeline (Pipeline | None): The pipeline to run.
        use_threading (bool | None): Whether to use threading instead of
            multiprocessing. If not specified, uses the use_threading_default.
    """
    self.pipeline = pipeline
    # Use the specified mode, or fall back to the class default
    self.use_threading = (
        use_threading
        if use_threading is not None
        else Parallelize.use_threading_default
    )
    # Claim pending element and shared-memory registrations onto this
    # instance, so that two Parallelize instances do not share state.
    self.instance_list = Parallelize.instance_list
    self.shm_list = Parallelize.shm_list
    Parallelize.instance_list = []
    Parallelize.shm_list = []
    # Track ourselves so cleanup_all_shm() can reach claimed segments
    # even if __exit__ never runs (e.g. user forgot the with-block).
    Parallelize._live_instances.add(self)

cleanup_all_shm() classmethod

Best-effort unlink of every shared memory segment tracked by Parallelize.

Walks two registries:

  • Parallelize.shm_list — pending segments registered by to_shm but not yet claimed by a Parallelize instance.
  • Parallelize._live_instances — segments already claimed onto a live Parallelize instance whose __exit__ has not run.

Every segment found is unlinked; FileNotFoundError (segment already gone) is suppressed. The registries are cleared so a subsequent call is a no-op.

Call this on an error path that bypasses the normal context-manager cleanup, or interactively to recover from a leak. Also registered as an :mod:atexit hook to catch the "user forgot with" case.

Source code in src/sgn/subprocess.py
@classmethod
def cleanup_all_shm(cls):
    """Best-effort unlink of every shared memory segment tracked by Parallelize.

    Walks two registries:

    - ``Parallelize.shm_list`` — pending segments registered by ``to_shm``
      but not yet claimed by a ``Parallelize`` instance.
    - ``Parallelize._live_instances`` — segments already claimed onto a
      live ``Parallelize`` instance whose ``__exit__`` has not run.

    Every segment found is unlinked; ``FileNotFoundError`` (segment already
    gone) is suppressed. The registries are cleared so a subsequent call
    is a no-op.

    Call this on an error path that bypasses the normal context-manager
    cleanup, or interactively to recover from a leak. Also registered as
    an :mod:`atexit` hook to catch the "user forgot ``with``" case.
    """
    # Pending segments (not yet claimed by any Parallelize instance).
    pending, cls.shm_list = cls.shm_list, []
    # Claimed segments held by any live Parallelize instance.
    claimed: list[dict] = []
    for inst in list(cls._live_instances):
        inst_shm = getattr(inst, "shm_list", None)
        if not inst_shm:
            continue
        claimed.extend(inst_shm)
        inst.shm_list = []

    for d in pending + claimed:
        with contextlib.suppress(FileNotFoundError, KeyError):
            multiprocessing.shared_memory.SharedMemory(name=d["name"]).unlink()

needs_parallelization(pipeline) staticmethod

Check if a pipeline contains any elements that require parallelization.

Parameters:

Name Type Description Default
pipeline Pipeline

The Pipeline instance to check.

required

Returns:

Name Type Description
bool

True if the pipeline contains any Parallelize* elements.

Source code in src/sgn/subprocess.py
@staticmethod
def needs_parallelization(pipeline):
    """
    Check if a pipeline contains any elements that require parallelization.

    Args:
        pipeline (Pipeline): The Pipeline instance to check.

    Returns:
        bool: True if the pipeline contains any Parallelize* elements.
    """
    # Check if any element is a subclass of ParallelizeBase
    return any(
        isinstance(element, ParallelizeBase) for element in pipeline.elements
    )

run(threaded=None)

Run the pipeline managed by this Parallelize instance.

This method executes the associated pipeline and ensures proper cleanup of worker resources, even in the case of exceptions. It signals all workers to stop when the pipeline execution completes or if an exception occurs.

Parameters:

Name Type Description Default
threaded int | Executor | None

Forwarded to Pipeline.run. Subprocess parallelization (this context) and thread-pool dispatch are complementary: workers run their heavy work in separate processes while thread_safe elements still dispatch their callbacks onto the thread pool in the main process. See Pipeline.run for the accepted values.

None

Raises:

Type Description
RuntimeError

If an exception occurs during pipeline execution

AssertionError

If no pipeline was provided to the Parallelize constructor.

Source code in src/sgn/subprocess.py
def run(self, threaded=None):
    """
    Run the pipeline managed by this Parallelize instance.

    This method executes the associated pipeline and ensures proper cleanup
    of worker resources, even in the case of exceptions. It signals all
    workers to stop when the pipeline execution completes or if an exception
    occurs.

    Args:
        threaded (int | Executor | None): Forwarded to ``Pipeline.run``.
            Subprocess parallelization (this context) and thread-pool
            dispatch are complementary: workers run their heavy work in
            separate processes while ``thread_safe`` elements still
            dispatch their callbacks onto the thread pool in the main
            process. See ``Pipeline.run`` for the accepted values.

    Raises:
        RuntimeError: If an exception occurs during pipeline execution
        AssertionError: If no pipeline was provided to the Parallelize
            constructor.
    """
    assert (
        self.pipeline is not None
    ), "Pipeline must be provided to Parallelize constructor before running"
    try:
        # Disable auto-parallelization since we're already in a Parallelize
        # context, but forward threaded so thread-pool dispatch of
        # thread_safe elements still applies in the main process.
        self.pipeline.run(auto_parallelize=False, threaded=threaded)
    except KeyboardInterrupt:
        logger.warning("Interrupt received, stopping pipeline...")
        self._stop_and_join_workers()
        return
    except Exception:
        self._stop_and_join_workers()
        raise

    # Signal all workers to stop when pipeline completes normally
    for p in self.instance_list:
        p.worker_stop.set()

to_shm(name, bytez, **kwargs) staticmethod

Create a shared memory object that can be accessed by subprocesses.

Note: This is only applicable in process mode. In thread mode, shared memory is not necessary since threads share the same address space.

This method creates a shared memory segment that will be automatically cleaned up when the Parallelize context manager exits. The shared memory can be used to efficiently share large data between processes without serialization overhead.

Parameters:

Name Type Description Default
name str

Unique identifier for the shared memory block.

required
bytez bytes | bytearray

Data to store in shared memory.

required
**kwargs Any

Additional metadata to store with the shared memory reference.

{}

Returns:

Type Description

dict[str, Any]: A dictionary containing the shared memory object and metadata with keys: - "name": The name of the shared memory block - "shm": The SharedMemory object - Any additional key-value pairs from kwargs

Raises:

Type Description
FileExistsError

If shared memory with the given name already exists

Example

shared_data = bytearray("Hello world", "utf-8") shm_ref = Parallelize.to_shm("example_data", shared_data)

Source code in src/sgn/subprocess.py
@staticmethod
def to_shm(name, bytez, **kwargs):
    """
    Create a shared memory object that can be accessed by subprocesses.

    Note: This is only applicable in process mode. In thread mode, shared memory
    is not necessary since threads share the same address space.

    This method creates a shared memory segment that will be automatically
    cleaned up when the Parallelize context manager exits. The shared memory can be
    used to efficiently share large data between processes without serialization
    overhead.

    Args:
        name (str): Unique identifier for the shared memory block.
        bytez (bytes | bytearray): Data to store in shared memory.
        **kwargs (Any): Additional metadata to store with the shared memory
            reference.

    Returns:
        dict[str, Any]: A dictionary containing the shared memory object
            and metadata with keys:
            - "name": The name of the shared memory block
            - "shm": The SharedMemory object
            - Any additional key-value pairs from kwargs

    Raises:
        FileExistsError: If shared memory with the given name already exists

    Example:
        shared_data = bytearray("Hello world", "utf-8")
        shm_ref = Parallelize.to_shm("example_data", shared_data)
    """
    try:
        shm = multiprocessing.shared_memory.SharedMemory(
            name=name, create=True, size=len(bytez)
        )
    except FileExistsError:
        logger.exception(
            "Shared memory %r already exists. Unlink it manually with "
            "multiprocessing.shared_memory.SharedMemory(name=%r).unlink(), "
            "or call Parallelize.cleanup_all_shm() to drop every segment "
            "currently tracked by Parallelize.",
            name,
            name,
        )
        # Intentionally do NOT bulk-unlink other segments in shm_list:
        # doing so corrupts shared memory belonging to other pending or
        # active Parallelize contexts.
        raise

    shm.buf[: len(bytez)] = bytez
    out = {"name": name, "shm": shm, **kwargs}
    Parallelize.shm_list.append(out)
    return out

ParallelizeBase dataclass

Bases: Parallelize


              flowchart TD
              sgn.subprocess.ParallelizeBase[ParallelizeBase]
              sgn.subprocess.Parallelize[Parallelize]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.subprocess.Parallelize --> sgn.subprocess.ParallelizeBase
                                sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                



              click sgn.subprocess.ParallelizeBase href "" "sgn.subprocess.ParallelizeBase"
              click sgn.subprocess.Parallelize href "" "sgn.subprocess.Parallelize"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A mixin class for sharing code between ParallelizeTransformElement and ParallelizeSinkElement.

This class provides common functionality for both transform and sink elements that run in separate processes or threads. It handles the creation and management of communication queues, worker lifecycle events, and provides methods for worker synchronization and cleanup.

Key features: - Creates and manages worker communication channels (queues) - Handles graceful worker termination and resource cleanup - Provides resilience against KeyboardInterrupt - workers will catch and ignore KeyboardInterrupt signals, allowing the main process to handle them and coordinate a clean shutdown of all workers - Supports orderly shutdown to process remaining queue items before termination

This is an internal implementation class and should not be instantiated directly. Instead, use ParallelizeTransformElement or ParallelizeSinkElement.

Developer Usage

@dataclass class MyElement(ParallelizeTransformElement): multiplier: int = 2 threshold: float = 0.5

@staticmethod
def worker_process(
    context: WorkerContext, multiplier: int, threshold: float
):
    try:
        frame = context.input_queue.get(timeout=1.0)
        if frame and frame.data > threshold:
            frame.data *= multiplier
            context.output_queue.put(frame)
    except queue.Empty:
        pass

Note on reserved worker_process parameter names: framework attributes such as in_queue, out_queue, worker, worker_stop, worker_shutdown, terminated, at_eos, queue_maxsize, err_maxsize, _use_threading_override, use_threading, pipeline, and frame_factory MUST NOT be used as worker_process parameter names. See _RESERVED_WORKER_PARAM_NAMES for the full list. The framework will raise ValueError if a collision is detected.

Source code in src/sgn/subprocess.py
@dataclass
class ParallelizeBase(Parallelize):
    """
    A mixin class for sharing code between ParallelizeTransformElement and
    ParallelizeSinkElement.

    This class provides common functionality for both transform and sink
    elements that run in separate processes or threads. It handles the creation and
    management of communication queues, worker lifecycle events, and provides methods
    for worker synchronization and cleanup.

    Key features:
    - Creates and manages worker communication channels (queues)
    - Handles graceful worker termination and resource cleanup
    - Provides resilience against KeyboardInterrupt - workers will catch and ignore
      KeyboardInterrupt signals, allowing the main process to handle them and coordinate
      a clean shutdown of all workers
    - Supports orderly shutdown to process remaining queue items before termination

    This is an internal implementation class and should not be instantiated
    directly. Instead, use ParallelizeTransformElement or ParallelizeSinkElement.

    Developer Usage:
        @dataclass
        class MyElement(ParallelizeTransformElement):
            multiplier: int = 2
            threshold: float = 0.5

            @staticmethod
            def worker_process(
                context: WorkerContext, multiplier: int, threshold: float
            ):
                try:
                    frame = context.input_queue.get(timeout=1.0)
                    if frame and frame.data > threshold:
                        frame.data *= multiplier
                        context.output_queue.put(frame)
                except queue.Empty:
                    pass

    Note on reserved worker_process parameter names: framework attributes such as
    ``in_queue``, ``out_queue``, ``worker``, ``worker_stop``, ``worker_shutdown``,
    ``terminated``, ``at_eos``, ``queue_maxsize``, ``err_maxsize``,
    ``_use_threading_override``, ``use_threading``, ``pipeline``, and
    ``frame_factory`` MUST NOT be used as worker_process parameter names.
    See ``_RESERVED_WORKER_PARAM_NAMES`` for the full list. The framework will
    raise ``ValueError`` if a collision is detected.
    """

    queue_maxsize: int | None = 100
    err_maxsize: int = 16384
    # Flag that can be set by subclasses to override the default
    _use_threading_override: bool | None = None

    def __post_init__(self):
        # Determine whether to use threading
        self.use_threading = (
            self._use_threading_override
            if self._use_threading_override is not None
            else Parallelize.use_threading_default
        )

        # Extract worker parameters automatically from instance attributes
        worker_params = self._extract_worker_parameters()

        # Create appropriate queues based on mode
        if self.use_threading:
            self.in_queue = QueueWrapper(queue.Queue(maxsize=self.queue_maxsize))
            self.out_queue = QueueWrapper(queue.Queue(maxsize=self.queue_maxsize))
            self.worker_stop = threading.Event()
            self.worker_shutdown = threading.Event()
            self.terminated = threading.Event()
            self.worker_exception = QueueWrapper(queue.Queue(maxsize=1))
            self.worker = threading.Thread(
                target=_worker_wrapper_function,
                args=(self.terminated, self.__class__, "worker_process"),
                kwargs={
                    "shm_list": Parallelize.shm_list,
                    "inq": self.in_queue,
                    "outq": self.out_queue,
                    "worker_stop": self.worker_stop,
                    "worker_shutdown": self.worker_shutdown,
                    "worker_exception": self.worker_exception,
                    **worker_params,
                },
                daemon=False,  # Ensure the thread doesn't terminate too early
            )
        else:
            self.in_queue = QueueWrapper(
                multiprocessing.Queue(maxsize=self.queue_maxsize)
            )
            self.out_queue = QueueWrapper(
                multiprocessing.Queue(maxsize=self.queue_maxsize)
            )
            self.worker_stop = multiprocessing.Event()
            self.worker_shutdown = multiprocessing.Event()
            self.terminated = multiprocessing.Event()
            self.worker_exception = QueueWrapper(multiprocessing.Queue(maxsize=1))
            self.worker = multiprocessing.Process(
                target=_worker_wrapper_function,
                args=(self.terminated, self.__class__, "worker_process"),
                kwargs={
                    "shm_list": Parallelize.shm_list,
                    "inq": self.in_queue,
                    "outq": self.out_queue,
                    "worker_stop": self.worker_stop,
                    "worker_shutdown": self.worker_shutdown,
                    "worker_exception": self.worker_exception,
                    **worker_params,
                },
                daemon=False,  # Ensure the process doesn't terminate too early
            )

        # Add to the pending-registration list; the next Parallelize(...) claims it.
        Parallelize.instance_list.append(self)

        # Storage for retrieved worker exception in main process
        self._retrieved_worker_exception = None

    def _extract_worker_parameters(self):
        """Extract parameters for worker_process method from instance attributes."""
        # Get the signature of the worker_process method
        sig = inspect.signature(self.worker_process)
        extracted = {}

        for param_name, param in sig.parameters.items():
            if param_name in ("self", "context"):  # Skip special parameters
                continue

            if param_name in _RESERVED_WORKER_PARAM_NAMES:
                raise ValueError(
                    f"worker_process parameter {param_name!r} collides with a "
                    f"framework-reserved attribute name. Rename the parameter "
                    f"to avoid shadowing internal state."
                )

            # Try to get the value from instance attributes
            if hasattr(self, param_name):
                extracted[param_name] = getattr(self, param_name)
            elif param.default is not param.empty:
                # Use default value if available
                extracted[param_name] = param.default

        return extracted

    def worker_process(self, context: WorkerContext, *args: Any, **kwargs: Any) -> None:
        """Override this method in subclasses to implement worker logic.

        This method should be implemented as a static method or avoid accessing
        instance attributes directly to prevent pickling issues in multiprocessing mode.
        All necessary data should be passed through the method parameters.

        Args:
            context: WorkerContext with clean access to queues and events
            *args: Automatically extracted instance attributes
            **kwargs: Automatically extracted instance attributes with defaults
        """
        raise NotImplementedError("Subclasses must implement worker_process method")

    @staticmethod
    def _drain_queues(input_queue=None, output_queue=None):
        """Drain and close the input and output queues.

        Args:
            input_queue: The input QueueWrapper to drain and close
            output_queue: The output QueueWrapper to drain and close
        """
        for q in (output_queue, input_queue):
            if q is None:
                continue
            try:
                while True:
                    q.get_nowait()
            except queue.Empty:
                pass
            except (EOFError, OSError):
                # Queue closed/broken during shutdown — treat as drained.
                logger.debug("Queue drain stopped", exc_info=True)
            q.close()

    def sub_process_shutdown(self, timeout=0):
        """
        Initiate an orderly shutdown of the worker.

        This method signals the worker to complete processing of any pending data
        and then terminate. It waits for the worker to indicate completion, and
        collects any remaining data from the output queue before cleaning up resources.

        Args:
            timeout (int, optional): Maximum time in seconds to wait for the worker
                to terminate. Defaults to 0 (wait indefinitely).

        Returns:
            list[Any]: Any remaining items from the output queue.

        Raises:
            RuntimeError: If the worker does not terminate within the specified
                timeout.
        """
        # Signal worker to finish processing pending data
        self.worker_shutdown.set()
        start = time.time()
        out = []

        # Wait for worker to indicate termination. Poll at 0.1s rather than 1s
        # so clean shutdowns don't add a second of latency.
        while not self.terminated.is_set():
            if timeout > 0 and time.time() - start > timeout:
                raise RuntimeError("timeout exceeded for worker shutdown")
            time.sleep(0.1)

        # Collect any remaining output data
        if self.out_queue is not None:
            try:
                while True:
                    # Queue.empty() is not reliable, so we use get_nowait()
                    out.append(self.out_queue.get_nowait())
            except queue.Empty:
                pass  # Queue is empty
            except (EOFError, OSError):
                logger.warning(
                    "Unexpected error draining output queue on shutdown",
                    exc_info=True,
                )

        # Signal complete stop and clean up resources
        self.worker_stop.set()
        self.in_queue = None
        self.out_queue = None
        return out

    def get_worker_exception(self):
        """Get the worker exception if available, returning None if no exception."""
        # Return cached exception if we already retrieved it
        if self._retrieved_worker_exception is not None:
            return self._retrieved_worker_exception

        if not hasattr(self, "worker_exception") or self.worker_exception is None:
            return None

        # Try to pull the exception from the worker queue and cache it.
        # Narrow handling: an empty queue means "no exception yet"; any other
        # error is a real problem and should not be silently swallowed.
        try:
            exc = self.worker_exception.get_nowait()
        except queue.Empty:
            return None
        except Exception:
            logger.warning("Failed to read worker exception queue", exc_info=True)
            return None

        if exc is not None:
            self._retrieved_worker_exception = exc
        return exc

    def check_worker_terminated(self):
        """
        Check for premature worker termination.

        This method verifies that the worker has not terminated before
        reaching End-Of-Stream (EOS). It is used internally to detect abnormal worker
        termination.

        Raises:
            RuntimeError: If the worker has terminated but has not reached EOS,
                         chained with the original worker exception if available
        """
        if self.terminated.is_set() and not self.at_eos:
            worker_exc = self.get_worker_exception()
            if worker_exc:
                raise RuntimeError("worker stopped before EOS") from worker_exc
            raise RuntimeError("worker stopped before EOS")

    def internal(self):
        """Element hook invoked between sink and source pads.

        Forwards to :meth:`check_worker_terminated` so that the pipeline raises
        promptly if the worker died before EOS. The concrete element subclasses
        below re-bind ``internal`` to this method because, due to MRO, they
        would otherwise inherit ``Element.internal`` (a no-op) from
        ``TransformElement``/``SinkElement``/``SourceElement`` before reaching
        ``ParallelizeBase``.
        """
        self.check_worker_terminated()

check_worker_terminated()

Check for premature worker termination.

This method verifies that the worker has not terminated before reaching End-Of-Stream (EOS). It is used internally to detect abnormal worker termination.

Raises:

Type Description
RuntimeError

If the worker has terminated but has not reached EOS, chained with the original worker exception if available

Source code in src/sgn/subprocess.py
def check_worker_terminated(self):
    """
    Check for premature worker termination.

    This method verifies that the worker has not terminated before
    reaching End-Of-Stream (EOS). It is used internally to detect abnormal worker
    termination.

    Raises:
        RuntimeError: If the worker has terminated but has not reached EOS,
                     chained with the original worker exception if available
    """
    if self.terminated.is_set() and not self.at_eos:
        worker_exc = self.get_worker_exception()
        if worker_exc:
            raise RuntimeError("worker stopped before EOS") from worker_exc
        raise RuntimeError("worker stopped before EOS")

get_worker_exception()

Get the worker exception if available, returning None if no exception.

Source code in src/sgn/subprocess.py
def get_worker_exception(self):
    """Get the worker exception if available, returning None if no exception."""
    # Return cached exception if we already retrieved it
    if self._retrieved_worker_exception is not None:
        return self._retrieved_worker_exception

    if not hasattr(self, "worker_exception") or self.worker_exception is None:
        return None

    # Try to pull the exception from the worker queue and cache it.
    # Narrow handling: an empty queue means "no exception yet"; any other
    # error is a real problem and should not be silently swallowed.
    try:
        exc = self.worker_exception.get_nowait()
    except queue.Empty:
        return None
    except Exception:
        logger.warning("Failed to read worker exception queue", exc_info=True)
        return None

    if exc is not None:
        self._retrieved_worker_exception = exc
    return exc

internal()

Element hook invoked between sink and source pads.

Forwards to :meth:check_worker_terminated so that the pipeline raises promptly if the worker died before EOS. The concrete element subclasses below re-bind internal to this method because, due to MRO, they would otherwise inherit Element.internal (a no-op) from TransformElement/SinkElement/SourceElement before reaching ParallelizeBase.

Source code in src/sgn/subprocess.py
def internal(self):
    """Element hook invoked between sink and source pads.

    Forwards to :meth:`check_worker_terminated` so that the pipeline raises
    promptly if the worker died before EOS. The concrete element subclasses
    below re-bind ``internal`` to this method because, due to MRO, they
    would otherwise inherit ``Element.internal`` (a no-op) from
    ``TransformElement``/``SinkElement``/``SourceElement`` before reaching
    ``ParallelizeBase``.
    """
    self.check_worker_terminated()

sub_process_shutdown(timeout=0)

Initiate an orderly shutdown of the worker.

This method signals the worker to complete processing of any pending data and then terminate. It waits for the worker to indicate completion, and collects any remaining data from the output queue before cleaning up resources.

Parameters:

Name Type Description Default
timeout int

Maximum time in seconds to wait for the worker to terminate. Defaults to 0 (wait indefinitely).

0

Returns:

Type Description

list[Any]: Any remaining items from the output queue.

Raises:

Type Description
RuntimeError

If the worker does not terminate within the specified timeout.

Source code in src/sgn/subprocess.py
def sub_process_shutdown(self, timeout=0):
    """
    Initiate an orderly shutdown of the worker.

    This method signals the worker to complete processing of any pending data
    and then terminate. It waits for the worker to indicate completion, and
    collects any remaining data from the output queue before cleaning up resources.

    Args:
        timeout (int, optional): Maximum time in seconds to wait for the worker
            to terminate. Defaults to 0 (wait indefinitely).

    Returns:
        list[Any]: Any remaining items from the output queue.

    Raises:
        RuntimeError: If the worker does not terminate within the specified
            timeout.
    """
    # Signal worker to finish processing pending data
    self.worker_shutdown.set()
    start = time.time()
    out = []

    # Wait for worker to indicate termination. Poll at 0.1s rather than 1s
    # so clean shutdowns don't add a second of latency.
    while not self.terminated.is_set():
        if timeout > 0 and time.time() - start > timeout:
            raise RuntimeError("timeout exceeded for worker shutdown")
        time.sleep(0.1)

    # Collect any remaining output data
    if self.out_queue is not None:
        try:
            while True:
                # Queue.empty() is not reliable, so we use get_nowait()
                out.append(self.out_queue.get_nowait())
        except queue.Empty:
            pass  # Queue is empty
        except (EOFError, OSError):
            logger.warning(
                "Unexpected error draining output queue on shutdown",
                exc_info=True,
            )

    # Signal complete stop and clean up resources
    self.worker_stop.set()
    self.in_queue = None
    self.out_queue = None
    return out

worker_process(context, *args, **kwargs)

Override this method in subclasses to implement worker logic.

This method should be implemented as a static method or avoid accessing instance attributes directly to prevent pickling issues in multiprocessing mode. All necessary data should be passed through the method parameters.

Parameters:

Name Type Description Default
context WorkerContext

WorkerContext with clean access to queues and events

required
*args Any

Automatically extracted instance attributes

()
**kwargs Any

Automatically extracted instance attributes with defaults

{}
Source code in src/sgn/subprocess.py
def worker_process(self, context: WorkerContext, *args: Any, **kwargs: Any) -> None:
    """Override this method in subclasses to implement worker logic.

    This method should be implemented as a static method or avoid accessing
    instance attributes directly to prevent pickling issues in multiprocessing mode.
    All necessary data should be passed through the method parameters.

    Args:
        context: WorkerContext with clean access to queues and events
        *args: Automatically extracted instance attributes
        **kwargs: Automatically extracted instance attributes with defaults
    """
    raise NotImplementedError("Subclasses must implement worker_process method")

ParallelizeSinkElement dataclass

Bases: SinkElement, ParallelizeBase, Parallelize


              flowchart TD
              sgn.subprocess.ParallelizeSinkElement[ParallelizeSinkElement]
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]
              sgn.subprocess.ParallelizeBase[ParallelizeBase]
              sgn.subprocess.Parallelize[Parallelize]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.base.SinkElement --> sgn.subprocess.ParallelizeSinkElement
                                sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                


                sgn.subprocess.ParallelizeBase --> sgn.subprocess.ParallelizeSinkElement
                                sgn.subprocess.Parallelize --> sgn.subprocess.ParallelizeBase
                                sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                


                sgn.subprocess.Parallelize --> sgn.subprocess.ParallelizeSinkElement
                                sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                



              click sgn.subprocess.ParallelizeSinkElement href "" "sgn.subprocess.ParallelizeSinkElement"
              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.subprocess.ParallelizeBase href "" "sgn.subprocess.ParallelizeBase"
              click sgn.subprocess.Parallelize href "" "sgn.subprocess.Parallelize"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A Sink element that runs data consumption logic in a separate process or thread.

This class extends the standard SinkElement to execute its processing in a separate worker (process or thread). It communicates with the main process/thread through input and output queues, and manages the worker lifecycle. Subclasses must implement the worker_process method to define the consumption logic that runs in the worker.

The design intentionally avoids passing class or instance references to the worker to prevent pickling issues when using process mode. Instead, it passes all necessary data and resources via function arguments.

The implementation includes special handling for KeyboardInterrupt signals. When Ctrl+C is pressed in the terminal, workers will catch and ignore the KeyboardInterrupt, allowing them to continue processing while the main process coordinates a graceful shutdown. This prevents data loss and ensures all resources are properly cleaned up.

Attributes:

Name Type Description
queue_maxsize int

Maximum size of the communication queues

err_maxsize int

Maximum size for error data

_use_threading_override bool

Set to True to use threading or False to use multiprocessing. If not specified, uses the Parallelize.use_threading_default

Example with default process mode

@dataclass class MyLoggingSinkElement(ParallelizeSinkElement): def pull(self, pad, frame): if frame.EOS: self.mark_eos(pad) # Send the frame to the worker self.in_queue.put((pad.name, frame))

def worker_process(self, context: WorkerContext):
    try:
        # Get data from the main process/thread
        pad_name, frame = context.input_queue.get(timeout=0.1)

        # Process or log the data
        if not frame.EOS:
            print(f"Sink received on {pad_name}: {frame.data}")
        else:
            print(f"Sink received EOS on {pad_name}")

    except queue.Empty:
        pass
Example with thread mode

@dataclass class MyThreadedSinkElement(ParallelizeSinkElement): _use_threading_override = True # Implementation same as above

Source code in src/sgn/subprocess.py
@dataclass
class ParallelizeSinkElement(SinkElement, ParallelizeBase, Parallelize):
    """
    A Sink element that runs data consumption logic in a separate process or thread.

    This class extends the standard SinkElement to execute its processing in a
    separate worker (process or thread). It communicates with the main process/thread
    through input and output queues, and manages the worker lifecycle. Subclasses must
    implement the worker_process method to define the consumption logic that runs
    in the worker.

    The design intentionally avoids passing class or instance references to the
    worker to prevent pickling issues when using process mode. Instead, it passes all
    necessary data and resources via function arguments.

    The implementation includes special handling for KeyboardInterrupt signals.
    When Ctrl+C is pressed in the terminal, workers will catch and ignore the
    KeyboardInterrupt, allowing them to continue processing while the main process
    coordinates a graceful shutdown. This prevents data loss and ensures all resources
    are properly cleaned up.

    Attributes:
        queue_maxsize (int, optional): Maximum size of the communication queues
        err_maxsize (int): Maximum size for error data
        _use_threading_override (bool, optional): Set to True to use threading or
            False to use multiprocessing. If not specified, uses the
            Parallelize.use_threading_default

    Example with default process mode:
        @dataclass
        class MyLoggingSinkElement(ParallelizeSinkElement):
            def pull(self, pad, frame):
                if frame.EOS:
                    self.mark_eos(pad)
                # Send the frame to the worker
                self.in_queue.put((pad.name, frame))

            def worker_process(self, context: WorkerContext):
                try:
                    # Get data from the main process/thread
                    pad_name, frame = context.input_queue.get(timeout=0.1)

                    # Process or log the data
                    if not frame.EOS:
                        print(f"Sink received on {pad_name}: {frame.data}")
                    else:
                        print(f"Sink received EOS on {pad_name}")

                except queue.Empty:
                    pass

    Example with thread mode:
        @dataclass
        class MyThreadedSinkElement(ParallelizeSinkElement):
            _use_threading_override = True
            # Implementation same as above
    """

    # Re-bind so MRO picks up ParallelizeBase.internal rather than the no-op
    # Element.internal inherited via SinkElement. See ParallelizeBase.internal.
    internal = ParallelizeBase.internal

    def __post_init__(self):
        SinkElement.__post_init__(self)
        ParallelizeBase.__post_init__(self)

ParallelizeSourceElement dataclass

Bases: SourceElement, ParallelizeBase, Parallelize


              flowchart TD
              sgn.subprocess.ParallelizeSourceElement[ParallelizeSourceElement]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]
              sgn.subprocess.ParallelizeBase[ParallelizeBase]
              sgn.subprocess.Parallelize[Parallelize]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.base.SourceElement --> sgn.subprocess.ParallelizeSourceElement
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                


                sgn.subprocess.ParallelizeBase --> sgn.subprocess.ParallelizeSourceElement
                                sgn.subprocess.Parallelize --> sgn.subprocess.ParallelizeBase
                                sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                


                sgn.subprocess.Parallelize --> sgn.subprocess.ParallelizeSourceElement
                                sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                



              click sgn.subprocess.ParallelizeSourceElement href "" "sgn.subprocess.ParallelizeSourceElement"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.subprocess.ParallelizeBase href "" "sgn.subprocess.ParallelizeBase"
              click sgn.subprocess.Parallelize href "" "sgn.subprocess.Parallelize"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A Source element that generates data in a separate process or thread.

This class extends the standard SourceElement to execute its data generation logic in a separate worker (process or thread). It communicates with the main process through output queues, and manages the worker lifecycle. Subclasses must implement the worker_process method to define the data generation logic that runs in the worker.

The design intentionally avoids passing class or instance references to the worker to prevent pickling issues when using process mode. Instead, it passes all necessary data and resources via function arguments.

The implementation includes special handling for KeyboardInterrupt signals. When Ctrl+C is pressed in the terminal, workers will catch and ignore the KeyboardInterrupt, allowing them to continue processing while the main process coordinates a graceful shutdown. This prevents data loss and ensures all resources are properly cleaned up.

Attributes:

Name Type Description
queue_maxsize int

Maximum size of the communication queues

err_maxsize int

Maximum size for error data

frame_factory Callable

Function to create Frame objects

at_eos bool

Flag indicating if End-Of-Stream has been reached

_use_threading_override bool

Set to True to use threading or False to use multiprocessing. If not specified, uses the Parallelize.use_threading_default

Example with default process mode

@dataclass class MyDataSourceElement(ParallelizeSourceElement): def post_init(self): super().post_init() # Dictionary to track EOS status for each pad self.pad_eos = {pad.name: False for pad in self.source_pads}

def new(self, pad):
    # Check if this pad has already reached EOS
    if self.pad_eos[pad.name]:
        return Frame(data=None, EOS=True)

    try:
        # Get data generated by the worker
        # In a real implementation, you might use pad-specific queues
        # or have the worker send pad-specific data
        data = self.out_queue.get(timeout=1)

        # Check for EOS signal (None typically indicates EOS)
        if data is None:
            self.pad_eos[pad.name] = True
            # If all pads have reached EOS, set global EOS flag
            if all(self.pad_eos.values()):
                self.at_eos = True
            return Frame(data=None, EOS=True)

        # For data intended for other pads, you might implement
        # custom routing logic here

        return Frame(data=data)
    except queue.Empty:
        # Return an empty frame if no data is available
        return Frame(data=None)

def worker_process(self, context: WorkerContext):
    # Generate data and send it back to the main process/thread
    for i in range(10):
        if context.should_stop():
            break
        context.output_queue.put(f"Generated data {i}")
        time.sleep(0.5)

    # Signal end of stream with None
    context.output_queue.put(None)

    # Wait for worker_stop before terminating
    # This prevents "worker stopped before EOS" errors
    while not context.should_stop():
        time.sleep(0.1)
Example with thread mode

@dataclass class MyThreadedSourceElement(ParallelizeSourceElement): _use_threading_override = True

def __post_init__(self):
    super().__post_init__()
    # Dictionary to track EOS status for each pad
    self.pad_eos = {pad.name: False for pad in self.source_pads}

def new(self, pad):
    # Similar implementation as in the process mode example,
    # but might use threading-specific features if needed
    if self.pad_eos[pad.name]:
        return Frame(data=None, EOS=True)

    # Rest of implementation same as the process mode example
Source code in src/sgn/subprocess.py
@dataclass
class ParallelizeSourceElement(SourceElement, ParallelizeBase, Parallelize):
    """
    A Source element that generates data in a separate process or thread.

    This class extends the standard SourceElement to execute its data generation logic
    in a separate worker (process or thread). It communicates with the main process
    through output queues, and manages the worker lifecycle. Subclasses must implement
    the worker_process method to define the data generation logic that runs in
    the worker.

    The design intentionally avoids passing class or instance references to the
    worker to prevent pickling issues when using process mode. Instead, it passes all
    necessary data and resources via function arguments.

    The implementation includes special handling for KeyboardInterrupt signals.
    When Ctrl+C is pressed in the terminal, workers will catch and ignore the
    KeyboardInterrupt, allowing them to continue processing while the main process
    coordinates a graceful shutdown. This prevents data loss and ensures all resources
    are properly cleaned up.

    Attributes:
        queue_maxsize (int, optional): Maximum size of the communication queues
        err_maxsize (int): Maximum size for error data
        frame_factory (Callable, optional): Function to create Frame objects
        at_eos (bool): Flag indicating if End-Of-Stream has been reached
        _use_threading_override (bool, optional): Set to True to use threading or
            False to use multiprocessing. If not specified, uses the
            Parallelize.use_threading_default

    Example with default process mode:
        @dataclass
        class MyDataSourceElement(ParallelizeSourceElement):
            def __post_init__(self):
                super().__post_init__()
                # Dictionary to track EOS status for each pad
                self.pad_eos = {pad.name: False for pad in self.source_pads}

            def new(self, pad):
                # Check if this pad has already reached EOS
                if self.pad_eos[pad.name]:
                    return Frame(data=None, EOS=True)

                try:
                    # Get data generated by the worker
                    # In a real implementation, you might use pad-specific queues
                    # or have the worker send pad-specific data
                    data = self.out_queue.get(timeout=1)

                    # Check for EOS signal (None typically indicates EOS)
                    if data is None:
                        self.pad_eos[pad.name] = True
                        # If all pads have reached EOS, set global EOS flag
                        if all(self.pad_eos.values()):
                            self.at_eos = True
                        return Frame(data=None, EOS=True)

                    # For data intended for other pads, you might implement
                    # custom routing logic here

                    return Frame(data=data)
                except queue.Empty:
                    # Return an empty frame if no data is available
                    return Frame(data=None)

            def worker_process(self, context: WorkerContext):
                # Generate data and send it back to the main process/thread
                for i in range(10):
                    if context.should_stop():
                        break
                    context.output_queue.put(f"Generated data {i}")
                    time.sleep(0.5)

                # Signal end of stream with None
                context.output_queue.put(None)

                # Wait for worker_stop before terminating
                # This prevents "worker stopped before EOS" errors
                while not context.should_stop():
                    time.sleep(0.1)

    Example with thread mode:
        @dataclass
        class MyThreadedSourceElement(ParallelizeSourceElement):
            _use_threading_override = True

            def __post_init__(self):
                super().__post_init__()
                # Dictionary to track EOS status for each pad
                self.pad_eos = {pad.name: False for pad in self.source_pads}

            def new(self, pad):
                # Similar implementation as in the process mode example,
                # but might use threading-specific features if needed
                if self.pad_eos[pad.name]:
                    return Frame(data=None, EOS=True)

                # Rest of implementation same as the process mode example
    """

    frame_factory: Callable = Frame
    at_eos: bool = False

    # Re-bind so MRO picks up ParallelizeBase.internal rather than the no-op
    # Element.internal inherited via SourceElement. See ParallelizeBase.internal.
    internal = ParallelizeBase.internal

    def __post_init__(self):
        SourceElement.__post_init__(self)
        ParallelizeBase.__post_init__(self)

ParallelizeTransformElement dataclass

Bases: TransformElement, ParallelizeBase, Parallelize


              flowchart TD
              sgn.subprocess.ParallelizeTransformElement[ParallelizeTransformElement]
              sgn.base.TransformElement[TransformElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]
              sgn.subprocess.ParallelizeBase[ParallelizeBase]
              sgn.subprocess.Parallelize[Parallelize]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.base.TransformElement --> sgn.subprocess.ParallelizeTransformElement
                                sgn.base.ElementLike --> sgn.base.TransformElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                


                sgn.subprocess.ParallelizeBase --> sgn.subprocess.ParallelizeTransformElement
                                sgn.subprocess.Parallelize --> sgn.subprocess.ParallelizeBase
                                sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                


                sgn.subprocess.Parallelize --> sgn.subprocess.ParallelizeTransformElement
                                sgn.sources.SignalEOS --> sgn.subprocess.Parallelize
                



              click sgn.subprocess.ParallelizeTransformElement href "" "sgn.subprocess.ParallelizeTransformElement"
              click sgn.base.TransformElement href "" "sgn.base.TransformElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.subprocess.ParallelizeBase href "" "sgn.subprocess.ParallelizeBase"
              click sgn.subprocess.Parallelize href "" "sgn.subprocess.Parallelize"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A Transform element that runs processing logic in a separate process or thread.

This class extends the standard TransformElement to execute its processing in a separate worker (process or thread). It communicates with the main process/thread through input and output queues, and manages the worker lifecycle. Subclasses must implement the worker_process method to define the processing logic that runs in the worker.

The design intentionally avoids passing class or instance references to the worker to prevent pickling issues when using process mode. Instead, it passes all necessary data and resources via function arguments.

The implementation includes special handling for KeyboardInterrupt signals. When Ctrl+C is pressed in the terminal, workers will catch and ignore the KeyboardInterrupt, allowing them to continue processing while the main process coordinates a graceful shutdown. This prevents data loss and ensures all resources are properly cleaned up.

Attributes:

Name Type Description
queue_maxsize int

Maximum size of the communication queues

err_maxsize int

Maximum size for error data

at_eos bool

Flag indicating if End-Of-Stream has been reached

_use_threading_override bool

Set to True to use threading or False to use multiprocessing. If not specified, uses the Parallelize.use_threading_default

Example with default process mode

@dataclass class MyProcessingElement(ParallelizeTransformElement): multiplier: int = 2 # Instance attributes become worker parameters

def pull(self, pad, frame):
    # Send the frame to the worker
    self.in_queue.put(frame)
    if frame.EOS:
        self.at_eos = True

def worker_process(self, context: WorkerContext, multiplier: int):
    # Process data in the worker using the clean context
    try:
        frame = context.input_queue.get(timeout=0.1)
        if frame and not frame.EOS:
            frame.data *= multiplier
            context.output_queue.put(frame)
    except queue.Empty:
        pass

def new(self, pad):
    # Get processed data from the worker
    return self.out_queue.get()
Example with thread mode

@dataclass class MyThreadedElement(ParallelizeTransformElement): _use_threading_override = True # Implementation same as above

Example

@dataclass class MyProcessingElement(ParallelizeTransformElement): multiplier: int = 2 threshold: float = 0.5

def pull(self, pad, frame):
    self.in_queue.put(frame)
    if frame.EOS:
        self.at_eos = True

def worker_process(
    self, context: WorkerContext, multiplier: int, threshold: float
):
    try:
        frame = context.input_queue.get(timeout=0.1)
        if frame and not frame.EOS and frame.data > threshold:
            frame.data *= multiplier
            context.output_queue.put(frame)
    except queue.Empty:
        pass

def new(self, pad):
    return self.out_queue.get()
Source code in src/sgn/subprocess.py
@dataclass
class ParallelizeTransformElement(TransformElement, ParallelizeBase, Parallelize):
    """
    A Transform element that runs processing logic in a separate process or thread.

    This class extends the standard TransformElement to execute its processing in a
    separate worker (process or thread). It communicates with the main process/thread
    through input and output queues, and manages the worker lifecycle. Subclasses must
    implement the worker_process method to define the processing logic that runs
    in the worker.

    The design intentionally avoids passing class or instance references to the
    worker to prevent pickling issues when using process mode. Instead, it passes all
    necessary data and resources via function arguments.

    The implementation includes special handling for KeyboardInterrupt signals.
    When Ctrl+C is pressed in the terminal, workers will catch and ignore the
    KeyboardInterrupt, allowing them to continue processing while the main process
    coordinates a graceful shutdown. This prevents data loss and ensures all resources
    are properly cleaned up.

    Attributes:
        queue_maxsize (int, optional): Maximum size of the communication queues
        err_maxsize (int): Maximum size for error data
        at_eos (bool): Flag indicating if End-Of-Stream has been reached
        _use_threading_override (bool, optional): Set to True to use threading or
            False to use multiprocessing. If not specified, uses the
            Parallelize.use_threading_default

    Example with default process mode:
        @dataclass
        class MyProcessingElement(ParallelizeTransformElement):
            multiplier: int = 2  # Instance attributes become worker parameters

            def pull(self, pad, frame):
                # Send the frame to the worker
                self.in_queue.put(frame)
                if frame.EOS:
                    self.at_eos = True

            def worker_process(self, context: WorkerContext, multiplier: int):
                # Process data in the worker using the clean context
                try:
                    frame = context.input_queue.get(timeout=0.1)
                    if frame and not frame.EOS:
                        frame.data *= multiplier
                        context.output_queue.put(frame)
                except queue.Empty:
                    pass

            def new(self, pad):
                # Get processed data from the worker
                return self.out_queue.get()

    Example with thread mode:
        @dataclass
        class MyThreadedElement(ParallelizeTransformElement):
            _use_threading_override = True
            # Implementation same as above

    Example:
        @dataclass
        class MyProcessingElement(ParallelizeTransformElement):
            multiplier: int = 2
            threshold: float = 0.5

            def pull(self, pad, frame):
                self.in_queue.put(frame)
                if frame.EOS:
                    self.at_eos = True

            def worker_process(
                self, context: WorkerContext, multiplier: int, threshold: float
            ):
                try:
                    frame = context.input_queue.get(timeout=0.1)
                    if frame and not frame.EOS and frame.data > threshold:
                        frame.data *= multiplier
                        context.output_queue.put(frame)
                except queue.Empty:
                    pass

            def new(self, pad):
                return self.out_queue.get()
    """

    at_eos: bool = False

    # Re-bind so MRO picks up ParallelizeBase.internal rather than the no-op
    # Element.internal inherited via TransformElement. See ParallelizeBase.internal.
    internal = ParallelizeBase.internal

    def __post_init__(self):
        TransformElement.__post_init__(self)
        ParallelizeBase.__post_init__(self)

QueueProtocol

Bases: Protocol


              flowchart TD
              sgn.subprocess.QueueProtocol[QueueProtocol]

              

              click sgn.subprocess.QueueProtocol href "" "sgn.subprocess.QueueProtocol"
            

Protocol defining a common Queue interface.

Source code in src/sgn/subprocess.py
class QueueProtocol(Protocol):
    """Protocol defining a common Queue interface."""

    def get(self, block: bool = True, timeout: float | None = None) -> Any:
        """Get an item from the queue."""
        ...

    def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
        """Put an item into the queue."""
        ...

    def empty(self) -> bool:
        """Return True if the queue is empty."""
        ...

    def get_nowait(self) -> Any:
        """Get an item from the queue without blocking."""
        ...

    def put_nowait(self, item: Any) -> None:
        """Put an item into the queue without blocking."""
        ...

empty()

Return True if the queue is empty.

Source code in src/sgn/subprocess.py
def empty(self) -> bool:
    """Return True if the queue is empty."""
    ...

get(block=True, timeout=None)

Get an item from the queue.

Source code in src/sgn/subprocess.py
def get(self, block: bool = True, timeout: float | None = None) -> Any:
    """Get an item from the queue."""
    ...

get_nowait()

Get an item from the queue without blocking.

Source code in src/sgn/subprocess.py
def get_nowait(self) -> Any:
    """Get an item from the queue without blocking."""
    ...

put(item, block=True, timeout=None)

Put an item into the queue.

Source code in src/sgn/subprocess.py
def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
    """Put an item into the queue."""
    ...

put_nowait(item)

Put an item into the queue without blocking.

Source code in src/sgn/subprocess.py
def put_nowait(self, item: Any) -> None:
    """Put an item into the queue without blocking."""
    ...

QueueWrapper

A wrapper that provides a unified interface for both Queue implementations.

This abstraction handles the differences between multiprocessing.Queue and queue.Queue APIs, specifically providing no-op implementations for multiprocessing-specific methods when wrapping a queue.Queue.

Source code in src/sgn/subprocess.py
class QueueWrapper:
    """
    A wrapper that provides a unified interface for both Queue implementations.

    This abstraction handles the differences between multiprocessing.Queue and
    queue.Queue APIs, specifically providing no-op implementations for
    multiprocessing-specific methods when wrapping a queue.Queue.
    """

    def __init__(self, queue_instance: QueueProtocol):
        self._queue = queue_instance

    def get(self, block: bool = True, timeout: float | None = None) -> Any:
        """Get an item from the queue."""
        return self._queue.get(block=block, timeout=timeout)

    def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
        """Put an item into the queue."""
        self._queue.put(item, block=block, timeout=timeout)

    def empty(self) -> bool:
        """Return True if the queue is empty."""
        return self._queue.empty()

    def get_nowait(self) -> Any:
        """Get an item from the queue without blocking."""
        return self._queue.get_nowait()

    def put_nowait(self, item: Any) -> None:  # pragma: no cover
        """Put an item into the queue without blocking."""
        self._queue.put_nowait(item)

    def cancel_join_thread(self) -> None:  # pragma: no cover
        """Cancel the background thread (no-op for queue.Queue)."""
        if hasattr(self._queue, "cancel_join_thread"):
            self._queue.cancel_join_thread()

    def close(self) -> None:  # pragma: no cover
        """Close the queue (no-op for queue.Queue)."""
        if hasattr(self._queue, "close"):
            self._queue.close()

cancel_join_thread()

Cancel the background thread (no-op for queue.Queue).

Source code in src/sgn/subprocess.py
def cancel_join_thread(self) -> None:  # pragma: no cover
    """Cancel the background thread (no-op for queue.Queue)."""
    if hasattr(self._queue, "cancel_join_thread"):
        self._queue.cancel_join_thread()

close()

Close the queue (no-op for queue.Queue).

Source code in src/sgn/subprocess.py
def close(self) -> None:  # pragma: no cover
    """Close the queue (no-op for queue.Queue)."""
    if hasattr(self._queue, "close"):
        self._queue.close()

empty()

Return True if the queue is empty.

Source code in src/sgn/subprocess.py
def empty(self) -> bool:
    """Return True if the queue is empty."""
    return self._queue.empty()

get(block=True, timeout=None)

Get an item from the queue.

Source code in src/sgn/subprocess.py
def get(self, block: bool = True, timeout: float | None = None) -> Any:
    """Get an item from the queue."""
    return self._queue.get(block=block, timeout=timeout)

get_nowait()

Get an item from the queue without blocking.

Source code in src/sgn/subprocess.py
def get_nowait(self) -> Any:
    """Get an item from the queue without blocking."""
    return self._queue.get_nowait()

put(item, block=True, timeout=None)

Put an item into the queue.

Source code in src/sgn/subprocess.py
def put(self, item: Any, block: bool = True, timeout: float | None = None) -> None:
    """Put an item into the queue."""
    self._queue.put(item, block=block, timeout=timeout)

put_nowait(item)

Put an item into the queue without blocking.

Source code in src/sgn/subprocess.py
def put_nowait(self, item: Any) -> None:  # pragma: no cover
    """Put an item into the queue without blocking."""
    self._queue.put_nowait(item)

WorkerContext

Context object passed to worker methods with clean access to resources.

Source code in src/sgn/subprocess.py
class WorkerContext:
    """Context object passed to worker methods with clean access to resources."""

    def __init__(
        self,
        input_queue=None,
        output_queue=None,
        worker_stop=None,
        worker_shutdown=None,
        **kwargs,
    ):
        self.input_queue = input_queue
        self.output_queue = output_queue
        self.stop_event = worker_stop
        self.shutdown_event = worker_shutdown
        self.shared_memory = kwargs.get("shm_list", [])
        self.state = {}  # Persistent dict for worker state across calls
        self.worker_exception = None
        self._raw_kwargs = kwargs

    def should_stop(self) -> bool:
        """Check if the worker should stop processing."""
        return self.stop_event and self.stop_event.is_set()

    def should_shutdown(self) -> bool:
        """Check if the worker should shutdown gracefully."""
        return self.shutdown_event and self.shutdown_event.is_set()

should_shutdown()

Check if the worker should shutdown gracefully.

Source code in src/sgn/subprocess.py
def should_shutdown(self) -> bool:
    """Check if the worker should shutdown gracefully."""
    return self.shutdown_event and self.shutdown_event.is_set()

should_stop()

Check if the worker should stop processing.

Source code in src/sgn/subprocess.py
def should_stop(self) -> bool:
    """Check if the worker should stop processing."""
    return self.stop_event and self.stop_event.is_set()