Skip to content

Running in Parallel

SGN supports running elements in separate processes or threads using the Parallelize* element classes.

Automatic Detection

When your pipeline contains any Parallelize* element, pipeline.run() automatically detects this and wraps execution in a Parallelize context:

from sgn import Pipeline

p = Pipeline()
# ... add elements including a ParallelizeTransformElement ...
p.run()  # Automatically handles parallelization

Process Mode Requirement

When using process mode (the default), your script must guard the entry point with if __name__ == "__main__": because Python's spawn start method re-imports the module.

ParallelizeTransformElement

Subclass ParallelizeTransformElement and implement worker_process():

import queue
from dataclasses import dataclass
from sgn import Frame
from sgn.subprocess import ParallelizeTransformElement, WorkerContext


@dataclass
class HeavyTransform(ParallelizeTransformElement):
    multiplier: float = 2.0  # Instance attrs become worker params

    def pull(self, pad, frame):
        self.in_queue.put(frame)
        if frame.EOS:
            self.at_eos = True

    def worker_process(self, context: WorkerContext, multiplier: float):
        try:
            frame = context.input_queue.get(timeout=0.1)
            if frame.data is not None:
                frame.data *= multiplier
            context.output_queue.put(frame)
        except queue.Empty:
            pass

    def new(self, pad):
        return self.out_queue.get()

Instance attributes whose names match worker_process parameters are automatically extracted and passed to the worker.

ParallelizeSinkElement

import queue
from dataclasses import dataclass
from sgn.subprocess import ParallelizeSinkElement, WorkerContext


@dataclass
class ParallelWriter(ParallelizeSinkElement):
    def pull(self, pad, frame):
        if frame.EOS:
            self.mark_eos(pad)
        self.in_queue.put((pad.pad_name, frame))

    def worker_process(self, context: WorkerContext):
        try:
            pad_name, frame = context.input_queue.get(timeout=0.1)
            if not frame.EOS:
                print(f"Writing {pad_name}: {frame.data}")
        except queue.Empty:
            pass

ParallelizeSourceElement

import queue
import time
from dataclasses import dataclass
from sgn import Frame
from sgn.subprocess import ParallelizeSourceElement, WorkerContext


@dataclass
class ParallelReader(ParallelizeSourceElement):
    def __post_init__(self):
        super().__post_init__()
        self.pad_eos = {pad.name: False for pad in self.source_pads}

    def new(self, pad):
        if self.pad_eos[pad.name]:
            return Frame(EOS=True)
        try:
            data = self.out_queue.get(timeout=1)
            if data is None:
                self.pad_eos[pad.name] = True
                return Frame(EOS=True)
            return Frame(data=data)
        except queue.Empty:
            return Frame()

    def worker_process(self, context: WorkerContext):
        for i in range(10):
            if context.should_stop():
                break
            context.output_queue.put(f"data_{i}")
            time.sleep(0.1)
        context.output_queue.put(None)  # Signal EOS
        while not context.should_stop():
            time.sleep(0.1)

Threading Mode

Set _use_threading_override = True on your element class for threading instead of multiprocessing. Use this for I/O-bound work:

@dataclass
class ThreadedTransform(ParallelizeTransformElement):
    _use_threading_override = True
    # ... same implementation as above ...

Manual Context Manager

For explicit control, use the Parallelize context manager directly:

from sgn.subprocess import Parallelize

def main():
    p = Pipeline()
    # ... build pipeline ...
    with Parallelize(p) as parallelize:
        parallelize.run()

if __name__ == "__main__":
    main()

Shared Memory (Process Mode)

Share data between processes without serialization overhead:

from sgn.subprocess import Parallelize

data = bytearray(b"shared data here")
shm_ref = Parallelize.to_shm("my_data", data)
# Workers can access via context.shared_memory

Graceful Shutdown

Call sub_process_shutdown() to let a worker finish processing its queue:

remaining = element.sub_process_shutdown(timeout=10)

The worker's context.should_shutdown() returns True, allowing it to drain its input queue before stopping.