Stats Monitoring¶
StatsSource produces frames containing system and process statistics using
psutil.
Prerequisites¶
StatsSource works without psutil but provides limited information.
Basic Usage¶
from sgn import Pipeline, CollectSink
from sgn.sources import StatsSource, SignalEOS
stats = StatsSource(
name="stats",
source_pad_names=["metrics"],
interval=2.0, # Collect every 2 seconds
include_process_stats=True,
include_system_stats=True,
wait=1.0, # Wait 1s between frames
)
sink = CollectSink(name="sink", sink_pad_names=["metrics"])
p = Pipeline()
p.connect(stats, sink)
with SignalEOS():
p.run() # Ctrl+C to stop
Frame Data¶
Each frame's data is a dictionary with keys:
{
"timestamp": 1234567890.123,
"process": {
"pid": 12345,
"cpu_percent": 5.2,
"memory": {"rss": 52428800, "vms": 104857600, ...},
"memory_percent": 1.2,
"num_threads": 4,
...
},
"system": {
"cpu": {"percent": 23.4, "count": {"physical": 8, "logical": 16}, ...},
"memory": {"total": 17179869184, "available": 8589934592, ...},
"disk": {...},
"network": {...},
},
}
Configuration¶
| Parameter | Default | Description |
|---|---|---|
interval |
None |
Seconds between collections. None = every iteration |
include_process_stats |
True |
Include current process statistics |
include_system_stats |
True |
Include system-wide statistics |
wait |
None |
Seconds to sleep between frames |
eos_on_signal |
True |
End stream on SIGINT/SIGTERM |
frame_factory |
Frame |
Factory for creating frames |
Processing Stats¶
Combine StatsSource with a transform to extract or alert on specific metrics:
from sgn import CallableTransform
def check_memory(frame):
if frame.data is None:
return None
proc = frame.data.get("process", {})
mem_pct = proc.get("memory_percent", 0)
if mem_pct > 80:
print(f"WARNING: Memory usage at {mem_pct:.1f}%")
return mem_pct
monitor = CallableTransform.from_callable(
name="monitor",
sink_pad_names=["metrics"],
callable=check_memory,
output_pad_name="metrics",
)