Skip to content

Stats Monitoring

StatsSource produces frames containing system and process statistics using psutil.

Prerequisites

pip install psutil

StatsSource works without psutil but provides limited information.

Basic Usage

from sgn import Pipeline, CollectSink
from sgn.sources import StatsSource, SignalEOS

stats = StatsSource(
    name="stats",
    source_pad_names=["metrics"],
    interval=2.0,           # Collect every 2 seconds
    include_process_stats=True,
    include_system_stats=True,
    wait=1.0,               # Wait 1s between frames
)

sink = CollectSink(name="sink", sink_pad_names=["metrics"])

p = Pipeline()
p.connect(stats, sink)

with SignalEOS():
    p.run()  # Ctrl+C to stop

Frame Data

Each frame's data is a dictionary with keys:

{
    "timestamp": 1234567890.123,
    "process": {
        "pid": 12345,
        "cpu_percent": 5.2,
        "memory": {"rss": 52428800, "vms": 104857600, ...},
        "memory_percent": 1.2,
        "num_threads": 4,
        ...
    },
    "system": {
        "cpu": {"percent": 23.4, "count": {"physical": 8, "logical": 16}, ...},
        "memory": {"total": 17179869184, "available": 8589934592, ...},
        "disk": {...},
        "network": {...},
    },
}

Configuration

Parameter Default Description
interval None Seconds between collections. None = every iteration
include_process_stats True Include current process statistics
include_system_stats True Include system-wide statistics
wait None Seconds to sleep between frames
eos_on_signal True End stream on SIGINT/SIGTERM
frame_factory Frame Factory for creating frames

Processing Stats

Combine StatsSource with a transform to extract or alert on specific metrics:

from sgn import CallableTransform

def check_memory(frame):
    if frame.data is None:
        return None
    proc = frame.data.get("process", {})
    mem_pct = proc.get("memory_percent", 0)
    if mem_pct > 80:
        print(f"WARNING: Memory usage at {mem_pct:.1f}%")
    return mem_pct

monitor = CallableTransform.from_callable(
    name="monitor",
    sink_pad_names=["metrics"],
    callable=check_memory,
    output_pad_name="metrics",
)