Skip to content

sgn.control

HTTPControl

Bases: SignalEOS


              flowchart TD
              sgn.control.HTTPControl[HTTPControl]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.sources.SignalEOS --> sgn.control.HTTPControl
                


              click sgn.control.HTTPControl href "" "sgn.control.HTTPControl"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A context manager that stores a bottle app running in a separate thread.

If you have a pipeline called p do,

with HTTPControl() as control: p.run()

The bottle process is started and stoped on enter and exit. This class inherits SignalEOS context manager actions too, becuase otherwise bottle will respond to ctrl+C and generally you want to deal with both signals and bottle contexts in a coherent way when executing a pipeline, so this implementation tries to do that.

Source code in src/sgn/control.py
class HTTPControl(SignalEOS):
    """A context manager that stores a bottle app running in a separate thread.

    If you have a pipeline called p do,

    with HTTPControl() as control:
        p.run()

    The bottle process is started and stoped on enter and exit. This class
    inherits SignalEOS context manager actions too, becuase otherwise bottle will
    respond to ctrl+C and generally you want to deal with both signals and bottle
    contexts in a coherent way when executing a pipeline, so this implementation
    tries to do that.
    """

    port = 0
    host: str | None = None
    post_slots: dict[str, Slot] = {}
    get_slots: dict[str, Slot] = {}
    http_thread: Thread | None = None
    tag: str | None = None

    def __init__(self, registry_file: str | Path = "registry.txt") -> None:
        self.registry_file = registry_file
        self._srv: WSGIServer | None = None
        self._bound_port: int = 0

    @classmethod
    def _resolve_host(cls) -> str:
        if cls.host is not None:
            return cls.host
        for name in (socket.gethostname(), "localhost"):
            try:
                cls.host = socket.gethostbyname(name)
                return cls.host
            except socket.gaierror:
                continue
        cls.host = "127.0.0.1"
        return cls.host

    def __enter__(self):
        host = self._resolve_host()
        port_event: dict[str, Any] = {
            "event": Event(),
            "port": None,
            "srv": None,
        }
        HTTPControl.http_thread = Thread(
            target=run_bottle_app,
            kwargs={
                "post_slots": HTTPControl.post_slots,
                "get_slots": HTTPControl.get_slots,
                "port": HTTPControl.port,
                "host": host,
                "tag": HTTPControl.tag,
                "port_event": port_event,
            },
            daemon=True,
        )
        HTTPControl.http_thread.start()  # Start the Bottle app as a subthread
        port_event["event"].wait()  # Block until the server has bound
        self._bound_port = port_event["port"]
        self._srv = port_event["srv"]
        suffix = "" if HTTPControl.tag is None else f"/{HTTPControl.tag}"
        logger.info(
            "Bottle app running on http://%s:%s%s", host, self._bound_port, suffix
        )
        with open(self.registry_file, "w") as f:
            f.write(f"http://{host}:{self._bound_port}{suffix}")
        super().__enter__()
        return self

    @property
    def bound_port(self) -> int:
        """The port the bottle server actually bound to (resolved after __enter__)."""
        return self._bound_port

    def __exit__(self, exc_type, exc_value, exc_traceback):
        if self._srv is not None:
            self._srv.shutdown()
            self._srv.server_close()
            self._srv = None
        if HTTPControl.http_thread is not None:
            HTTPControl.http_thread.join(3.0)
            HTTPControl.http_thread = None
        super().__exit__(exc_type, exc_value, exc_traceback)

    @classmethod
    def exchange_state(cls, name, state_dict):
        """Automate the common task of reading and writing state to an element.
        name is the name of an element (which is the key of both get and post slots)
        and state_dict is a dictionary of state variables with **correct** types that
        can be coerced out of json.  This will mostly work out of the box for simple
        data types like ints and floats and strings, but complicated data will probably
        not work.  FIXME consider supporting more complex types if it comes up.

        HTTPControl.exchange_state(<elem name>, state_dict) takes whatever was
        most recently posted to the post slot and updates matching keys in
        state_dict. The post slot is cleared, so a second call without an
        intervening POST is a no-op for the input direction.
        """
        has_post, postdata = cls.post_slots[name].take()
        if has_post:
            for k in state_dict:
                if k in postdata:
                    state_dict[k] = type(state_dict[k])(postdata[k])
        cls.get_slots[name].set(state_dict)

bound_port property

The port the bottle server actually bound to (resolved after enter).

exchange_state(name, state_dict) classmethod

Automate the common task of reading and writing state to an element. name is the name of an element (which is the key of both get and post slots) and state_dict is a dictionary of state variables with correct types that can be coerced out of json. This will mostly work out of the box for simple data types like ints and floats and strings, but complicated data will probably not work. FIXME consider supporting more complex types if it comes up.

HTTPControl.exchange_state(, state_dict) takes whatever was most recently posted to the post slot and updates matching keys in state_dict. The post slot is cleared, so a second call without an intervening POST is a no-op for the input direction.

Source code in src/sgn/control.py
@classmethod
def exchange_state(cls, name, state_dict):
    """Automate the common task of reading and writing state to an element.
    name is the name of an element (which is the key of both get and post slots)
    and state_dict is a dictionary of state variables with **correct** types that
    can be coerced out of json.  This will mostly work out of the box for simple
    data types like ints and floats and strings, but complicated data will probably
    not work.  FIXME consider supporting more complex types if it comes up.

    HTTPControl.exchange_state(<elem name>, state_dict) takes whatever was
    most recently posted to the post slot and updates matching keys in
    state_dict. The post slot is cleared, so a second call without an
    intervening POST is a no-op for the input direction.
    """
    has_post, postdata = cls.post_slots[name].take()
    if has_post:
        for k in state_dict:
            if k in postdata:
                state_dict[k] = type(state_dict[k])(postdata[k])
    cls.get_slots[name].set(state_dict)

HTTPControlSinkElement dataclass

Bases: SinkElement, HTTPControl


              flowchart TD
              sgn.control.HTTPControlSinkElement[HTTPControlSinkElement]
              sgn.base.SinkElement[SinkElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]
              sgn.control.HTTPControl[HTTPControl]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.base.SinkElement --> sgn.control.HTTPControlSinkElement
                                sgn.base.ElementLike --> sgn.base.SinkElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                


                sgn.control.HTTPControl --> sgn.control.HTTPControlSinkElement
                                sgn.sources.SignalEOS --> sgn.control.HTTPControl
                



              click sgn.control.HTTPControlSinkElement href "" "sgn.control.HTTPControlSinkElement"
              click sgn.base.SinkElement href "" "sgn.base.SinkElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.control.HTTPControl href "" "sgn.control.HTTPControl"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A lightweight subclass of SinkElement that defaults to setting up post and get routes based on the provided element name. Each route is backed by a single-value slot: posts always succeed (replacing any prior value), gets read the held value without consuming it.

Source code in src/sgn/control.py
@dataclass
class HTTPControlSinkElement(SinkElement, HTTPControl):
    """A lightweight subclass of SinkElement that defaults to setting up post
    and get routes based on the provided element name.  Each route is backed by
    a single-value slot: posts always succeed (replacing any prior value), gets
    read the held value without consuming it."""

    def __post_init__(self):
        SinkElement.__post_init__(self)
        HTTPControl.post_slots[self.name] = Slot()
        HTTPControl.get_slots[self.name] = Slot()

HTTPControlSourceElement dataclass

Bases: SourceElement, HTTPControl


              flowchart TD
              sgn.control.HTTPControlSourceElement[HTTPControlSourceElement]
              sgn.base.SourceElement[SourceElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]
              sgn.control.HTTPControl[HTTPControl]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.base.SourceElement --> sgn.control.HTTPControlSourceElement
                                sgn.base.ElementLike --> sgn.base.SourceElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                


                sgn.control.HTTPControl --> sgn.control.HTTPControlSourceElement
                                sgn.sources.SignalEOS --> sgn.control.HTTPControl
                



              click sgn.control.HTTPControlSourceElement href "" "sgn.control.HTTPControlSourceElement"
              click sgn.base.SourceElement href "" "sgn.base.SourceElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.control.HTTPControl href "" "sgn.control.HTTPControl"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A lightweight subclass of SourceElement that defaults to setting up post and get routes based on the provided element name. Each route is backed by a single-value slot: posts always succeed (replacing any prior value), gets read the held value without consuming it.

Source code in src/sgn/control.py
@dataclass
class HTTPControlSourceElement(SourceElement, HTTPControl):
    """A lightweight subclass of SourceElement that defaults to setting up post
    and get routes based on the provided element name.  Each route is backed by
    a single-value slot: posts always succeed (replacing any prior value), gets
    read the held value without consuming it."""

    def __post_init__(self):
        SourceElement.__post_init__(self)
        HTTPControl.post_slots[self.name] = Slot()
        HTTPControl.get_slots[self.name] = Slot()

HTTPControlTransformElement dataclass

Bases: TransformElement, HTTPControl


              flowchart TD
              sgn.control.HTTPControlTransformElement[HTTPControlTransformElement]
              sgn.base.TransformElement[TransformElement]
              sgn.base.ElementLike[ElementLike]
              sgn.base.UniqueID[UniqueID]
              sgn.control.HTTPControl[HTTPControl]
              sgn.sources.SignalEOS[SignalEOS]

                              sgn.base.TransformElement --> sgn.control.HTTPControlTransformElement
                                sgn.base.ElementLike --> sgn.base.TransformElement
                                sgn.base.UniqueID --> sgn.base.ElementLike
                


                sgn.control.HTTPControl --> sgn.control.HTTPControlTransformElement
                                sgn.sources.SignalEOS --> sgn.control.HTTPControl
                



              click sgn.control.HTTPControlTransformElement href "" "sgn.control.HTTPControlTransformElement"
              click sgn.base.TransformElement href "" "sgn.base.TransformElement"
              click sgn.base.ElementLike href "" "sgn.base.ElementLike"
              click sgn.base.UniqueID href "" "sgn.base.UniqueID"
              click sgn.control.HTTPControl href "" "sgn.control.HTTPControl"
              click sgn.sources.SignalEOS href "" "sgn.sources.SignalEOS"
            

A lightweight subclass of TransformElement that defaults to setting up post and get routes based on the provided element name. Each route is backed by a single-value slot: posts always succeed (replacing any prior value), gets read the held value without consuming it.

Source code in src/sgn/control.py
@dataclass
class HTTPControlTransformElement(TransformElement, HTTPControl):
    """A lightweight subclass of TransformElement that defaults to setting up post
    and get routes based on the provided element name.  Each route is backed by
    a single-value slot: posts always succeed (replacing any prior value), gets
    read the held value without consuming it."""

    def __post_init__(self):
        TransformElement.__post_init__(self)
        HTTPControl.post_slots[self.name] = Slot()
        HTTPControl.get_slots[self.name] = Slot()

Slot

A thread-safe single-value mailbox.

Replaces the original Queue(maxsize=1) + drain-then-put pattern, which was not atomic and could deadlock under concurrent access. set always replaces the current value (never blocks), get reads without consuming, and take reads-and-clears.

Downstream packages (e.g. sgnl) construct Slot directly to register extra routes on the shared HTTPControl.post_slots / get_slots dicts::

HTTPControl.post_slots["my_route"] = Slot()
Source code in src/sgn/control.py
class Slot:
    """A thread-safe single-value mailbox.

    Replaces the original ``Queue(maxsize=1)`` + drain-then-put pattern, which
    was not atomic and could deadlock under concurrent access. ``set`` always
    replaces the current value (never blocks), ``get`` reads without consuming,
    and ``take`` reads-and-clears.

    Downstream packages (e.g. ``sgnl``) construct ``Slot`` directly to register
    extra routes on the shared ``HTTPControl.post_slots`` / ``get_slots`` dicts::

        HTTPControl.post_slots["my_route"] = Slot()
    """

    _MISSING: Any = object()

    def __init__(self) -> None:
        self._lock = Lock()
        self._value: Any = self._MISSING

    def set(self, value: Any) -> None:
        with self._lock:
            self._value = value

    def get(self, default: Any = None) -> Any:
        with self._lock:
            return default if self._value is self._MISSING else self._value

    def take(self, default: Any = None) -> tuple[bool, Any]:
        with self._lock:
            if self._value is self._MISSING:
                return (False, default)
            value, self._value = self._value, self._MISSING
            return (True, value)

run_bottle_app(post_slots=None, get_slots=None, host='localhost', port=8080, tag=None, port_event=None)

A function that sets up post and get slots for a bottle server running on host:port.

post_slots is a dictionary whose keys define routes of the form http://host:port/tag/post/key. The values are Slot objects where the data that is posted by the external request will be stored.

get_slots define the inverse, e.g., http://host:port/tag/get/key and the Slot values are where the data for the request comes from.

Source code in src/sgn/control.py
def run_bottle_app(
    post_slots=None,
    get_slots=None,
    host="localhost",
    port=8080,
    tag=None,
    port_event=None,
):
    """A function that sets up post and get slots for a bottle server running
    on host:port.

    post_slots is a dictionary whose keys define routes of the form
    http://host:port/tag/post/key. The values are Slot objects where the data that is
    posted by the external request will be stored.

    get_slots define the inverse, e.g., http://host:port/tag/get/key and the Slot
    values are where the data for the request comes from.
    """
    app = Bottle()
    prefix = "" if tag is None else f"/{tag}"

    for postroute, postslot in post_slots.items():

        def post(postslot=postslot):
            data = request.json  # Get JSON payload
            if data:
                postslot.set(data)
                return {"status": "success", "message": "Data received"}
            else:
                return {"status": "error", "message": "Invalid JSON"}

        app.route(f"{prefix}/post/{postroute}", method="POST", callback=post)

    for getroute, getslot in get_slots.items():

        def get(
            getslot=getslot,
            key=None,
            key2=None,
            content_type="application",
            content_subtype="json",
        ):
            data = getslot.get(default={})
            response.content_type = f"{content_type}/{content_subtype}"
            if key is None:
                return json.dumps(data)
            elif key in data:
                if key2 is None:
                    if content_subtype == "json":
                        return json.dumps(data[key])
                    else:
                        return data[key]
                elif key2 in data[key]:
                    if content_subtype == "json":
                        return json.dumps(data[key][key2])
                    else:
                        return data[key][key2]
                else:
                    return {"status": "error", "message": f"{key2} not in data[{key}]"}
            else:
                return {"status": "error", "message": f"{key} not in data"}

        # The four-segment `/get/<content_type>/<content_subtype>/...` routes
        # collide with element names matching real content types (e.g. an
        # element literally named "text"). Element names should avoid that.
        app.route(f"{prefix}/get/{getroute}", method="GET", callback=get)
        app.route(f"{prefix}/get/{getroute}/<key>", method="GET", callback=get)
        app.route(f"{prefix}/get/{getroute}/<key>/<key2>", method="GET", callback=get)
        app.route(
            f"{prefix}/get/<content_type>/<content_subtype>/{getroute}/<key>",
            method="GET",
            callback=get,
        )
        app.route(
            f"{prefix}/get/<content_type>/<content_subtype>/{getroute}/<key>/<key2>",
            method="GET",
            callback=get,
        )

    srv = make_server(host, port, app, handler_class=WSGIRequestHandler)
    if port_event is not None:
        port_event["port"] = srv.server_port
        port_event["srv"] = srv
        port_event["event"].set()
    srv.serve_forever()