Running in Parallel¶
SGN supports running elements in separate processes or threads using the
Parallelize* element classes.
Automatic Detection¶
When your pipeline contains any Parallelize* element, pipeline.run()
automatically detects this and wraps execution in a Parallelize context:
from sgn import Pipeline
p = Pipeline()
# ... add elements including a ParallelizeTransformElement ...
p.run() # Automatically handles parallelization
Process Mode Requirement
When using process mode (the default), your script must guard the entry
point with if __name__ == "__main__": because Python's spawn start
method re-imports the module.
ParallelizeTransformElement¶
Subclass ParallelizeTransformElement and implement worker_process():
import queue
from dataclasses import dataclass
from sgn import Frame
from sgn.subprocess import ParallelizeTransformElement, WorkerContext
@dataclass
class HeavyTransform(ParallelizeTransformElement):
multiplier: float = 2.0 # Instance attrs become worker params
def pull(self, pad, frame):
self.in_queue.put(frame)
if frame.EOS:
self.at_eos = True
def worker_process(self, context: WorkerContext, multiplier: float):
try:
frame = context.input_queue.get(timeout=0.1)
if frame.data is not None:
frame.data *= multiplier
context.output_queue.put(frame)
except queue.Empty:
pass
def new(self, pad):
return self.out_queue.get()
Instance attributes whose names match worker_process parameters are
automatically extracted and passed to the worker.
ParallelizeSinkElement¶
import queue
from dataclasses import dataclass
from sgn.subprocess import ParallelizeSinkElement, WorkerContext
@dataclass
class ParallelWriter(ParallelizeSinkElement):
def pull(self, pad, frame):
if frame.EOS:
self.mark_eos(pad)
self.in_queue.put((pad.pad_name, frame))
def worker_process(self, context: WorkerContext):
try:
pad_name, frame = context.input_queue.get(timeout=0.1)
if not frame.EOS:
print(f"Writing {pad_name}: {frame.data}")
except queue.Empty:
pass
ParallelizeSourceElement¶
import queue
import time
from dataclasses import dataclass
from sgn import Frame
from sgn.subprocess import ParallelizeSourceElement, WorkerContext
@dataclass
class ParallelReader(ParallelizeSourceElement):
def __post_init__(self):
super().__post_init__()
self.pad_eos = {pad.name: False for pad in self.source_pads}
def new(self, pad):
if self.pad_eos[pad.name]:
return Frame(EOS=True)
try:
data = self.out_queue.get(timeout=1)
if data is None:
self.pad_eos[pad.name] = True
return Frame(EOS=True)
return Frame(data=data)
except queue.Empty:
return Frame()
def worker_process(self, context: WorkerContext):
for i in range(10):
if context.should_stop():
break
context.output_queue.put(f"data_{i}")
time.sleep(0.1)
context.output_queue.put(None) # Signal EOS
while not context.should_stop():
time.sleep(0.1)
Threading Mode¶
Set _use_threading_override = True on your element class for threading instead
of multiprocessing. Use this for I/O-bound work:
@dataclass
class ThreadedTransform(ParallelizeTransformElement):
_use_threading_override = True
# ... same implementation as above ...
Manual Context Manager¶
For explicit control, use the Parallelize context manager directly:
from sgn.subprocess import Parallelize
def main():
p = Pipeline()
# ... build pipeline ...
with Parallelize(p) as parallelize:
parallelize.run()
if __name__ == "__main__":
main()
Shared Memory (Process Mode)¶
Share data between processes without serialization overhead:
from sgn.subprocess import Parallelize
data = bytearray(b"shared data here")
shm_ref = Parallelize.to_shm("my_data", data)
# Workers can access via context.shared_memory
Graceful Shutdown¶
Call sub_process_shutdown() to let a worker finish processing its queue:
The worker's context.should_shutdown() returns True, allowing it to drain
its input queue before stopping.