From d3bab5de94ea28005c029d261633bdc7d21ccbb7 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Sun, 30 Jul 2023 14:22:47 -0400 Subject: [PATCH] klippy_apis: support subscription callbacks This callback will only fire after a component has requested a subscription, preventing early updates while the component is waiting for the subscription request to complete. It is still valid for components to register the "server:status_update" event handler if this behavior is not a concern. Signed-off-by: Eric Callahan --- moonraker/components/data_store.py | 9 +++--- moonraker/components/job_state.py | 6 ++-- moonraker/components/klippy_apis.py | 43 ++++++++++++++++++++--------- moonraker/components/paneldue.py | 8 +++--- moonraker/components/power.py | 6 ++-- moonraker/components/spoolman.py | 9 ++---- 6 files changed, 44 insertions(+), 37 deletions(-) diff --git a/moonraker/components/data_store.py b/moonraker/components/data_store.py index e01f207..b7fbc3f 100644 --- a/moonraker/components/data_store.py +++ b/moonraker/components/data_store.py @@ -42,9 +42,6 @@ class DataStore: self.temp_update_timer = eventloop.register_timer( self._update_temperature_store) - # Register status update event - self.server.register_event_handler( - "server:status_update", self._set_current_temps) self.server.register_event_handler( "server:gcode_response", self._update_gcode_store) self.server.register_event_handler( @@ -78,7 +75,9 @@ class DataStore: sub: Dict[str, Optional[List[str]]] = {s: None for s in sensors} try: status: Dict[str, Any] - status = await klippy_apis.subscribe_objects(sub) + status = await klippy_apis.subscribe_objects( + sub, self._set_current_temps + ) except self.server.error as e: logging.info(f"Error subscribing to sensors: {e}") return @@ -111,7 +110,7 @@ class DataStore: self.temperature_store = {} self.temp_update_timer.stop() - def _set_current_temps(self, data: Dict[str, Any]) -> None: + def _set_current_temps(self, data: Dict[str, Any], _: float = 0.) -> None: for sensor in self.temperature_store: if sensor in data: last_val = self.last_temps[sensor] diff --git a/moonraker/components/job_state.py b/moonraker/components/job_state.py index 70a8fdc..12de654 100644 --- a/moonraker/components/job_state.py +++ b/moonraker/components/job_state.py @@ -25,8 +25,6 @@ class JobState: self.last_print_stats: Dict[str, Any] = {} self.server.register_event_handler( "server:klippy_started", self._handle_started) - self.server.register_event_handler( - "server:status_update", self._status_update) async def _handle_started(self, state: str) -> None: if state != "ready": @@ -34,7 +32,7 @@ class JobState: kapis: KlippyAPI = self.server.lookup_component('klippy_apis') sub: Dict[str, Optional[List[str]]] = {"print_stats": None} try: - result = await kapis.subscribe_objects(sub) + result = await kapis.subscribe_objects(sub, self._status_update) except self.server.error as e: logging.info(f"Error subscribing to print_stats") self.last_print_stats = result.get("print_stats", {}) @@ -42,7 +40,7 @@ class JobState: state = self.last_print_stats["state"] logging.info(f"Job state initialized: {state}") - async def _status_update(self, data: Dict[str, Any]) -> None: + async def _status_update(self, data: Dict[str, Any], _: float) -> None: if 'print_stats' not in data: return ps = data['print_stats'] diff --git a/moonraker/components/klippy_apis.py b/moonraker/components/klippy_apis.py index 8ba7df1..8bdfd25 100644 --- a/moonraker/components/klippy_apis.py +++ b/moonraker/components/klippy_apis.py @@ -18,11 +18,14 @@ from typing import ( List, TypeVar, Mapping, + Callable, + Coroutine ) if TYPE_CHECKING: from ..confighelper import ConfigHelper from ..klippy_connection import KlippyConnection as Klippy Subscription = Dict[str, Optional[List[Any]]] + SubCallback = Callable[[Dict[str, Dict[str, Any]], float], Optional[Coroutine]] _T = TypeVar("_T") INFO_ENDPOINT = "info" @@ -39,11 +42,13 @@ class KlippyAPI(Subscribable): def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.klippy: Klippy = self.server.lookup_component("klippy_connection") + self.eventloop = self.server.get_event_loop() app_args = self.server.get_app_args() self.version = app_args.get('software_version') # Maintain a subscription for all moonraker requests, as # we do not want to overwrite them self.host_subscription: Subscription = {} + self.subscription_callbacks: List[SubCallback] = [] # Register GCode Aliases self.server.register_endpoint( @@ -58,6 +63,13 @@ class KlippyAPI(Subscribable): "/printer/restart", ['POST'], self._gcode_restart) self.server.register_endpoint( "/printer/firmware_restart", ['POST'], self._gcode_firmware_restart) + self.server.register_event_handler( + "server:klippy_disconnect", self._on_klippy_disconnect + ) + + def _on_klippy_disconnect(self) -> None: + self.host_subscription.clear() + self.subscription_callbacks.clear() async def _gcode_pause(self, web_request: WebRequest) -> str: return await self.pause_print() @@ -197,16 +209,18 @@ class KlippyAPI(Subscribable): params = {'objects': objects} result = await self._send_klippy_request( STATUS_ENDPOINT, params, default) - if isinstance(result, dict) and 'status' in result: - return result['status'] + if isinstance(result, dict) and "status" in result: + return result["status"] if default is not Sentinel.MISSING: return default raise self.server.error("Invalid response received from Klippy", 500) - async def subscribe_objects(self, - objects: Mapping[str, Optional[List[str]]], - default: Union[Sentinel, _T] = Sentinel.MISSING - ) -> Union[_T, Dict[str, Any]]: + async def subscribe_objects( + self, + objects: Mapping[str, Optional[List[str]]], + callback: Optional[SubCallback] = None, + default: Union[Sentinel, _T] = Sentinel.MISSING + ) -> Union[_T, Dict[str, Any]]: for obj, items in objects.items(): if obj in self.host_subscription: prev = self.host_subscription[obj] @@ -217,11 +231,13 @@ class KlippyAPI(Subscribable): self.host_subscription[obj] = uitems else: self.host_subscription[obj] = items - params = {'objects': self.host_subscription} + params = {'objects': dict(self.host_subscription)} result = await self._send_klippy_request( SUBSCRIPTION_ENDPOINT, params, default) - if isinstance(result, dict) and 'status' in result: - return result['status'] + if isinstance(result, dict) and "status" in result: + if callback is not None: + self.subscription_callbacks.append(callback) + return result["status"] if default is not Sentinel.MISSING: return default raise self.server.error("Invalid response received from Klippy", 500) @@ -237,10 +253,11 @@ class KlippyAPI(Subscribable): {'response_template': {"method": method_name}, 'remote_method': method_name}) - def send_status(self, - status: Dict[str, Any], - eventtime: float - ) -> None: + def send_status( + self, status: Dict[str, Any], eventtime: float + ) -> None: + for cb in self.subscription_callbacks: + self.eventloop.register_callback(cb, status, eventtime) self.server.send_event("server:status_update", status) def load_component(config: ConfigHelper) -> KlippyAPI: diff --git a/moonraker/components/paneldue.py b/moonraker/components/paneldue.py index 9443d32..05fdd06 100644 --- a/moonraker/components/paneldue.py +++ b/moonraker/components/paneldue.py @@ -229,8 +229,6 @@ class PanelDue: "server:klippy_shutdown", self._process_klippy_shutdown) self.server.register_event_handler( "server:klippy_disconnect", self._process_klippy_disconnect) - self.server.register_event_handler( - "server:status_update", self.handle_status_update) self.server.register_event_handler( "server:gcode_response", self.handle_gcode_response) @@ -320,7 +318,9 @@ class PanelDue: self.heaters.extend(extruders) try: status: Dict[str, Any] - status = await self.klippy_apis.subscribe_objects(sub_args) + status = await self.klippy_apis.subscribe_objects( + sub_args, self.handle_status_update + ) except self.server.error: logging.exception("Unable to complete subscription request") else: @@ -337,7 +337,7 @@ class PanelDue: self.last_printer_state = 'O' self.is_shutdown = self.is_shutdown = False - def handle_status_update(self, status: Dict[str, Any]) -> None: + def handle_status_update(self, status: Dict[str, Any], _: float) -> None: for obj, items in status.items(): if obj in self.printer_state: self.printer_state[obj].update(items) diff --git a/moonraker/components/power.py b/moonraker/components/power.py index 60e5464..6bdf5a0 100644 --- a/moonraker/components/power.py +++ b/moonraker/components/power.py @@ -593,14 +593,12 @@ class KlipperDevice(PowerDevice): "Klipper object must be either 'output_pin' or 'gcode_macro' " f"for option 'object_name' in section [{config.get_name()}]") - self.server.register_event_handler( - "server:status_update", self._status_update) self.server.register_event_handler( "server:klippy_ready", self._handle_ready) self.server.register_event_handler( "server:klippy_disconnect", self._handle_disconnect) - def _status_update(self, data: Dict[str, Any]) -> None: + def _status_update(self, data: Dict[str, Any], _: float) -> None: self._set_state_from_data(data) def get_device_info(self) -> Dict[str, Any]: @@ -611,7 +609,7 @@ class KlipperDevice(PowerDevice): async def _handle_ready(self) -> None: kapis: APIComp = self.server.lookup_component('klippy_apis') sub: Dict[str, Optional[List[str]]] = {self.object_name: None} - data = await kapis.subscribe_objects(sub, None) + data = await kapis.subscribe_objects(sub, self._status_update, None) if not self._validate_data(data): self.state == "error" else: diff --git a/moonraker/components/spoolman.py b/moonraker/components/spoolman.py index d298269..8e6e532 100644 --- a/moonraker/components/spoolman.py +++ b/moonraker/components/spoolman.py @@ -77,16 +77,11 @@ class SpoolManager: ) async def _handle_server_ready(self): - self.server.register_event_handler( - "server:status_update", self._handle_status_update - ) result = await self.klippy_apis.subscribe_objects( - {"toolhead": ["position"]} + {"toolhead": ["position"]}, self._handle_status_update, {} ) initial_e_pos = self._eposition_from_status(result) - logging.debug(f"Initial epos: {initial_e_pos}") - if initial_e_pos is not None: self.highest_e_pos = initial_e_pos else: @@ -97,7 +92,7 @@ class SpoolManager: position = status.get("toolhead", {}).get("position", []) return position[3] if len(position) > 3 else None - async def _handle_status_update(self, status: Dict[str, Any]) -> None: + async def _handle_status_update(self, status: Dict[str, Any], _: float) -> None: epos = self._eposition_from_status(status) if epos and epos > self.highest_e_pos: async with self.extruded_lock: