Skip to content

sgn.testpoint

SGN Pipeline testpoints

This module adds "testpoint" capability to sgn applications. When the TESTPOINT environment variable is appropriately defined, various statistics will be captured from pipeline pads and displayed in a live updating table in the terminal.

If TESTPOINT=frame, the frame information passing through pads will be collected and displayed in the table.

If TESTPOINT=exec, pad execution times will be collected, and execution time statistics will be displayed in the table.

The TESTPOINT_REGEX environment variable is a space-separated list of regular expressions (regex) that control which pads are collected. The regex are matched against the pad full names, which are of the form:

{element}:{pad_type}:{pad_name}

Examples:

Display the execution time all internal pads in the pipeline:

TESTPOINT=exec TESTPOINT_REGEX=".:inl:." my_sgn_pipeline ...

Display information about frames flowing into all sink pads ("snk") of element "foo" that start with the letter "a":

TESTPOINT=frame TESTPOINT_REGEX="foo:snk:a.*" my_sgn_pipeline ...

The create_testpoint() function can also be used to create the testpoint display programmatically.

TestpointDisplay

Display pad information in a "live" table in the terminal.

Parameters:

Name Type Description Default
padspec list[str] | str

List of regular expressions to determine which pads to include in the table.

required
fields Iterable[str] | None

List of fields to display in the table. The first field will always be the pad name.

None
sort bool

Bool, whether or not to sort table. [True]

True
live bool

Bool, whether to create a live updating table or not. [True]

True
Source code in src/sgn/testpoint.py
class TestpointDisplay:
    """Display pad information in a "live" table in the terminal.

    Args:
        padspec:
            List of regular expressions to determine which pads to
            include in the table.
        fields:
            List of fields to display in the table. The first field
            will always be the pad name.
        sort:
            Bool, whether or not to sort table. [True]
        live:
            Bool, whether to create a live updating table or
            not. [True]

    """

    DEFAULT_FIELDS: list[str] = []
    DEFAULT_DICT: Callable | None = None

    def __init__(
        self,
        padspec: list[str] | str,
        fields: Iterable[str] | None = None,
        *,
        sort: bool = True,
        live: bool = True,
    ):
        try:
            from rich.console import Console
            from rich.live import Live
        except ModuleNotFoundError:
            raise ModuleNotFoundError(
                "testpoint functionality requires the `rich` package."
            )

        if isinstance(padspec, str):
            self.padspec = [padspec]
        else:
            self.padspec = padspec
        self.fields = fields if fields else self.DEFAULT_FIELDS
        self.sort = sort
        self.console = Console()
        self.live: Live | None = None
        if live:
            self.live = Live(console=self.console, auto_refresh=False)
        self.testpoints: dict[str, Any] = collections.defaultdict(self.DEFAULT_DICT)
        self.selected: dict[str, bool] = collections.defaultdict(bool)
        self.iteration = 0

    def select(self, pad_name: str):
        """True if pad_name matches self.padspec"""
        if pad_name not in self.selected:
            for padspec in self.padspec:
                if re.match(padspec, pad_name):
                    self.selected[pad_name] = True
                    break
        return self.selected[pad_name]

    def make_table(self):
        """Generate display table"""
        raise NotImplementedError()

    def update(self):
        """Update the table display"""
        from rich.table import Table

        if self.live and not self.live.is_started:
            self.live.start()

        table = Table(box=None)
        table.add_row(self.make_table())
        table.add_row(f"iteration: {self.iteration}")
        if self.live:
            self.live.update(table, refresh=True)
        else:
            self.console.print(table)
        self.iteration += 1

    def stop(self):
        """Stop the live display"""
        if self.live:
            self.live.stop()

make_table()

Generate display table

Source code in src/sgn/testpoint.py
def make_table(self):
    """Generate display table"""
    raise NotImplementedError()

select(pad_name)

True if pad_name matches self.padspec

Source code in src/sgn/testpoint.py
def select(self, pad_name: str):
    """True if pad_name matches self.padspec"""
    if pad_name not in self.selected:
        for padspec in self.padspec:
            if re.match(padspec, pad_name):
                self.selected[pad_name] = True
                break
    return self.selected[pad_name]

stop()

Stop the live display

Source code in src/sgn/testpoint.py
def stop(self):
    """Stop the live display"""
    if self.live:
        self.live.stop()

update()

Update the table display

Source code in src/sgn/testpoint.py
def update(self):
    """Update the table display"""
    from rich.table import Table

    if self.live and not self.live.is_started:
        self.live.start()

    table = Table(box=None)
    table.add_row(self.make_table())
    table.add_row(f"iteration: {self.iteration}")
    if self.live:
        self.live.update(table, refresh=True)
    else:
        self.console.print(table)
    self.iteration += 1

TestpointExec

Bases: TestpointDisplay


              flowchart TD
              sgn.testpoint.TestpointExec[TestpointExec]
              sgn.testpoint.TestpointDisplay[TestpointDisplay]

                              sgn.testpoint.TestpointDisplay --> sgn.testpoint.TestpointExec
                


              click sgn.testpoint.TestpointExec href "" "sgn.testpoint.TestpointExec"
              click sgn.testpoint.TestpointDisplay href "" "sgn.testpoint.TestpointDisplay"
            

Display information about pad execution times

Source code in src/sgn/testpoint.py
class TestpointExec(TestpointDisplay):
    """Display information about pad execution times"""

    DEFAULT_FIELDS = [
        r"exec time mean \[ns]",
        r"exec time stdev \[ns]",
        "# samples",
    ]

    DEFAULT_DICT: Callable = functools.partial(collections.deque, maxlen=10)

    def record_exec(self, pad, dt):
        """Record pad execution time"""
        if self.select(pad.name):
            self.testpoints[pad.name].append(int(dt * 1e9))

    def make_table(self):
        """Make table of pad execution times"""
        from rich.table import Table

        table = Table(box=None)
        # table.add_row("pad", *self.fields, style="bold")
        table.add_column("pad")
        for field in self.fields:
            table.add_column(field, justify="right")
        f_iter = [
            (
                pad_name,
                (
                    statistics.mean(samples),
                    _stdev(samples),
                    len(samples),
                ),
            )
            for pad_name, samples in self.testpoints.items()
        ]
        if self.sort:
            # sort on execution time
            f_iter = sorted(f_iter, key=lambda x: x[1][0], reverse=True)
        for pad_name, fields in f_iter:
            values = [f"{round(val):_}" for val in fields]
            table.add_row(pad_name, *values)
        return table

make_table()

Make table of pad execution times

Source code in src/sgn/testpoint.py
def make_table(self):
    """Make table of pad execution times"""
    from rich.table import Table

    table = Table(box=None)
    # table.add_row("pad", *self.fields, style="bold")
    table.add_column("pad")
    for field in self.fields:
        table.add_column(field, justify="right")
    f_iter = [
        (
            pad_name,
            (
                statistics.mean(samples),
                _stdev(samples),
                len(samples),
            ),
        )
        for pad_name, samples in self.testpoints.items()
    ]
    if self.sort:
        # sort on execution time
        f_iter = sorted(f_iter, key=lambda x: x[1][0], reverse=True)
    for pad_name, fields in f_iter:
        values = [f"{round(val):_}" for val in fields]
        table.add_row(pad_name, *values)
    return table

record_exec(pad, dt)

Record pad execution time

Source code in src/sgn/testpoint.py
def record_exec(self, pad, dt):
    """Record pad execution time"""
    if self.select(pad.name):
        self.testpoints[pad.name].append(int(dt * 1e9))

TestpointFrame

Bases: TestpointDisplay


              flowchart TD
              sgn.testpoint.TestpointFrame[TestpointFrame]
              sgn.testpoint.TestpointDisplay[TestpointDisplay]

                              sgn.testpoint.TestpointDisplay --> sgn.testpoint.TestpointFrame
                


              click sgn.testpoint.TestpointFrame href "" "sgn.testpoint.TestpointFrame"
              click sgn.testpoint.TestpointDisplay href "" "sgn.testpoint.TestpointDisplay"
            

Display information about frames passing through pads

Source code in src/sgn/testpoint.py
class TestpointFrame(TestpointDisplay):
    """Display information about frames passing through pads"""

    DEFAULT_FIELDS = [
        "type",
        "offset",
        "noffset",
        "bufs",
        "has gap",
        "rate",
        "samples",
        "dtype",
        "data",
    ]

    DEFAULT_DICT = None

    def record_frame(self, pad: Pad, frame: Frame, pad_type: str = ""):
        """Record pad frame data

        `pad_type` will override the inherent pad.pad_type

        """
        pad_name = pad.name
        if pad_type:
            pad_name = pad_name.replace(f":{pad.pad_type}:", f":{pad_type}:", 1)
        if self.select(pad_name):
            self.testpoints[pad_name] = frame

    def make_table(self):
        """Make table of pad frame info"""
        from rich.table import Table

        table = Table(box=None)
        table.add_column("pad")
        for field in self.fields:
            table.add_column(field)
        f_iter = self.testpoints.items()
        if self.sort:
            f_iter = sorted(f_iter, key=lambda x: x[0])
        for pad_name, frame in f_iter:
            values = []
            for field in self.fields:
                try:
                    fstr = _frame_field_format(frame, field)
                except (AttributeError, ValueError, IndexError, TypeError):
                    fstr = "-"
                values.append(fstr)
            table.add_row(pad_name, *values)
        return table

make_table()

Make table of pad frame info

Source code in src/sgn/testpoint.py
def make_table(self):
    """Make table of pad frame info"""
    from rich.table import Table

    table = Table(box=None)
    table.add_column("pad")
    for field in self.fields:
        table.add_column(field)
    f_iter = self.testpoints.items()
    if self.sort:
        f_iter = sorted(f_iter, key=lambda x: x[0])
    for pad_name, frame in f_iter:
        values = []
        for field in self.fields:
            try:
                fstr = _frame_field_format(frame, field)
            except (AttributeError, ValueError, IndexError, TypeError):
                fstr = "-"
            values.append(fstr)
        table.add_row(pad_name, *values)
    return table

record_frame(pad, frame, pad_type='')

Record pad frame data

pad_type will override the inherent pad.pad_type

Source code in src/sgn/testpoint.py
def record_frame(self, pad: Pad, frame: Frame, pad_type: str = ""):
    """Record pad frame data

    `pad_type` will override the inherent pad.pad_type

    """
    pad_name = pad.name
    if pad_type:
        pad_name = pad_name.replace(f":{pad.pad_type}:", f":{pad_type}:", 1)
    if self.select(pad_name):
        self.testpoints[pad_name] = frame

create_testpoint(ttype, *args, **kwargs)

Create global testpoint object

Source code in src/sgn/testpoint.py
def create_testpoint(ttype, *args, **kwargs):
    """Create global testpoint object"""
    global TESTPOINT
    if ttype == "frame":
        TESTPOINT = TestpointFrame(*args, **kwargs)
    elif ttype == "exec":
        TESTPOINT = TestpointExec(*args, **kwargs)
    else:
        msg = f"invalid testpoint type '{ttype}'"
        raise ValueError(msg)

record_exec(*args)

Record pad execution time

Source code in src/sgn/testpoint.py
def record_exec(*args):
    """Record pad execution time"""
    _create_from_env()
    if TESTPOINT and isinstance(TESTPOINT, TestpointExec):
        TESTPOINT.record_exec(*args)

record_frame(*args, **kwargs)

Record pad frame

Source code in src/sgn/testpoint.py
def record_frame(*args, **kwargs):
    """Record pad frame"""
    _create_from_env()
    if TESTPOINT and isinstance(TESTPOINT, TestpointFrame):
        TESTPOINT.record_frame(*args, **kwargs)

stop()

Stop testpoint display

Source code in src/sgn/testpoint.py
def stop():
    """Stop testpoint display"""
    if TESTPOINT:
        TESTPOINT.stop()

update()

Update testpoint display

Source code in src/sgn/testpoint.py
def update():
    """Update testpoint display"""
    _create_from_env()
    if TESTPOINT:
        TESTPOINT.update()