diff --git a/moonraker/components/analysis.py b/moonraker/components/analysis.py new file mode 100644 index 0000000..a35343d --- /dev/null +++ b/moonraker/components/analysis.py @@ -0,0 +1,448 @@ +# Printer GCode Analysis using Klipper Estimator +# +# Copyright (C) 2025 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license + +from __future__ import annotations +import sys +import os +import platform +import pathlib +import stat +import re +import logging +import asyncio +from ..common import RequestType +from ..utils import json_wrapper as jsonw +from typing import ( + TYPE_CHECKING, + Union, + Optional, + Dict, + Any, + Tuple +) + +if TYPE_CHECKING: + from ..confighelper import ConfigHelper + from ..common import WebRequest + from .update_manager.update_manager import UpdateManager + from .klippy_connection import KlippyConnection + from .authorization import Authorization + from .file_manager.file_manager import FileManager + from .machine import Machine + from .shell_command import ShellCommandFactory + from .http_client import HttpClient + StrOrPath = Union[str, pathlib.Path] + +ESTIMATOR_URL = ( + "https://github.com/Annex-Engineering/klipper_estimator/" + "releases/latest/download/{asset}" +) +UPDATE_CONFIG = { + "type": "executable", + "channel": "stable", + "repo": "Annex-Engineering/klipper_estimator", + "is_system_service": "False", + "path": "" +} +RELEASE_INFO = { + "project_name": "klipper_estimator", + "project_owner": "Annex-Engineering", + "version": "", + "asset_name": "" +} + +class GcodeAnalysis: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.cmd_lock = asyncio.Lock() + self.file_manger: FileManager = self.server.lookup_component("file_manager") + data_path = self.server.get_app_args()["data_path"] + tool_folder = pathlib.Path(data_path).joinpath("tools/klipper_estimator") + if not tool_folder.exists(): + tool_folder.mkdir(parents=True) + self.estimator_timeout = config.getint("estimator_timeout", 600) + self.auto_dump_defcfg = config.getboolean("auto_dump_default_config", False) + self.default_config = tool_folder.joinpath("default_estimator_cfg.json") + self.estimator_config = self.default_config + est_config = config.get("estimator_config", None) + if est_config is not None: + est_path = self.file_manger.get_full_path("config", est_config.strip("/")) + if ".." in est_path.parts: + raise config.error( + "Value for option 'estimator_config' must not contain " + "a '..' segment" + ) + if not est_path.exists(): + raise config.error( + f"File '{est_config}' does not exist in 'config' root" + ) + self.estimator_config = est_path + if config.getboolean("enable_auto_analysis", False): + self.server.register_event_handler( + "file_manager:metadata_processed", self._on_metadata_processed + ) + self.estimator_path: pathlib.Path | None = None + self.estimator_ready: bool = False + self.estimator_version: str = "?" + pltform_choices = ["rpi", "linux", "osx", "auto"] + pltform = config.getchoice("platform", pltform_choices, default_key="auto") + if pltform == "auto": + auto_pfrm = self._detect_platform() + if auto_pfrm is not None: + self.estimator_path = tool_folder.joinpath( + f"klipper_estimator_{auto_pfrm}" + ) + else: + exec_name = f"klipper_estimator_{pltform}" + self.estimator_path = tool_folder.joinpath(exec_name) + enable_updates = config.getboolean("enable_estimator_updates", False) + self.updater_registered: bool = False + if enable_updates: + if self.estimator_path is None: + logging.info( + "Klipper estimator platform not detected, updates disabled" + ) + elif not config.has_section("update_manager"): + logging.info("Update Manager not configured, updates disabled") + else: + try: + um: UpdateManager + um = self.server.load_component(config, "update_manager") + updater_cfg = UPDATE_CONFIG + updater_cfg["path"] = str(tool_folder) + um.register_updater("klipper_estimator", updater_cfg) + except self.server.error: + logging.exception("Klipper Estimator update registration failed") + else: + self.updater_registered = True + if not self.updater_registered: + # Add reserved path when updates are disabled + self.file_manger.add_reserved_path("analysis", tool_folder, False) + self.server.register_endpoint( + "/server/analysis/status", RequestType.GET, + self._handle_status_request + ) + self.server.register_endpoint( + "/server/analysis/estimate", RequestType.POST, + self._handle_estimation_request + ) + self.server.register_endpoint( + "/server/analysis/dump_config", RequestType.POST, + self._handle_dump_cfg_request + ) + self.server.register_event_handler( + "server:klippy_ready", self._on_klippy_ready + ) + + @property + def estimator_version_tuple(self) -> Tuple[int, ...]: + if self.estimator_version in ["?", ""]: + return tuple() + ver_string = self.estimator_version + if ver_string[0] == "v": + ver_string = ver_string[1:] + return tuple([int(p) for p in ver_string.split(".")]) + + async def _on_klippy_ready(self) -> None: + if not self.estimator_ready: + return + if self.auto_dump_defcfg or not self.default_config.exists(): + logging.info( + "Dumping default Klipper Estimator configuration to " + f"{self.default_config}" + ) + eventloop = self.server.get_event_loop() + eventloop.create_task(self._dump_estimator_config(self.default_config)) + + async def _on_metadata_processed(self, rel_gc_path: str) -> None: + if not self.estimator_ready: + logging.info("Klipper Estimator not available") + return + try: + full_path = self.file_manger.get_full_path("gcodes", rel_gc_path) + ret = await self.estimate_file(full_path) + self._update_metadata_est_time(rel_gc_path, ret) + except self.server.error: + logging.exception("Klipper Estimator failure") + + def _update_metadata_est_time( + self, gc_fname: str, est_data: Dict[str, Any] + ) -> None: + md_storage = self.file_manger.get_metadata_storage() + gc_metadata = md_storage.get(gc_fname, None) + if gc_metadata is not None: + if "slicer_estimated_time" not in gc_metadata: + prev_est = gc_metadata.get("estimated_time", 0) + gc_metadata["slicer_estimated_time"] = prev_est + gc_metadata["estimated_time"] = round(est_data["total_time"], 2) + md_storage.insert(gc_fname, gc_metadata) + + async def component_init(self) -> None: + if self.estimator_path is None: + return + if not self.estimator_path.exists(): + # Download Klipper Estimator + await self._download_klipper_estimator(self.estimator_path) + if not self._check_estimator_perms(self.estimator_path): + self.server.add_warning( + "[analysis]: Moonraker lacks permission to execute Klipper Estimator", + "analysis_permission" + ) + return + else: + await self._detect_estimator_version() + if self.estimator_version == "?": + logging.info("Failed to initialize Klipper Estimator") + else: + await self._check_release_info(self.estimator_path) + self.estimator_ready = True + logging.info( + f"Klipper Estimator Version {self.estimator_version} detected" + ) + + def _detect_platform(self) -> Optional[str]: + # Detect OS + if sys.platform.startswith("darwin"): + return "osx" + elif sys.platform.startswith("linux"): + # Get architecture + arch: str = platform.machine() + if not arch: + self.server.add_warning( + "[analysis]: Failed to detect CPU architecture. " + "Manual configuration of the 'platform' option is required.", + "analysis_estimator" + ) + return None + if arch == "x86_64": + return "linux" + elif arch in ("armv7l", "aarch64"): + # TODO: Other platforms may work, not sure + return "rpi" + else: + self.server.add_warning( + f"[analysis]: Unsupported CPU architecture '{arch}'. " + "Manual configuration of the 'platform' option is required.", + "analysis_estimator" + ) + return None + else: + self.server.add_warning( + f"[analysis]: Unsupported platform '{sys.platform}'. " + "Manual configuration of the 'platform' option is required.", + "analysis_estimator" + ) + + async def _download_klipper_estimator(self, estimator_path: pathlib.Path) -> None: + """ + Download Klipper Estimator, set executable permissions, and generate + the release_info.json file + """ + async with self.cmd_lock: + est_name = estimator_path.name + logging.info(f"Downloading latest {est_name}...") + url = ESTIMATOR_URL.format(asset=est_name) + http_client: HttpClient = self.server.lookup_component("http_client") + await http_client.download_file( + url, "application/octet-stream", estimator_path + ) + logging.info("Klipper Estimator download complete.") + + async def _detect_estimator_version(self) -> None: + cmd = f"{self.estimator_path} --version" + scmd: ShellCommandFactory = self.server.lookup_component("shell_command") + ret = await scmd.exec_cmd(cmd, timeout=10.) + ver_match = re.match(r"klipper_estimator (v?\d+(?:\.\d+)*)", ret) + if ver_match is None: + self.estimator_version = "?" + else: + self.estimator_version = ver_match.group(1) + + def _check_estimator_perms(self, estimator_path: pathlib.Path) -> bool: + req_perms = stat.S_IXUSR | stat.S_IXGRP + kest_perms = stat.S_IMODE(estimator_path.stat().st_mode) + if req_perms & kest_perms != req_perms: + logging.info("Setting excutable permissions for Klipper Estimator...") + try: + estimator_path.chmod(kest_perms | req_perms) + except OSError: + logging.exception( + "Failed to set Klipper Estimator Permissions" + ) + return os.access(estimator_path, os.X_OK) + + async def _check_release_info(self, estimator_path: pathlib.Path) -> None: + rinfo_file = estimator_path.parent.joinpath("release_info.json") + if rinfo_file.is_file(): + return + logging.info("Creating release_info.json for Klipper Estimator...") + rinfo = dict(RELEASE_INFO) + rinfo["version"] = self.estimator_version + rinfo["asset_name"] = estimator_path.name + eventloop = self.server.get_event_loop() + await eventloop.run_in_thread(rinfo_file.write_bytes, jsonw.dumps(rinfo)) + if self.updater_registered: + logging.info("Refreshing Klipper Estimator Updater Instance...") + um: UpdateManager = self.server.lookup_component("update_manager") + eventloop.create_task(um.refresh_updater("klipper_estimator", True)) + + def _get_moonraker_url(self) -> str: + machine: Machine = self.server.lookup_component("machine") + host_info = self.server.get_host_info() + host_addr: str = host_info["address"] + if host_addr.lower() in ["all", "0.0.0.0"]: + address = "127.0.0.1" + elif host_addr.lower() == "::": + address = "::1" + else: + address = machine.public_ip + if not address: + address = f"{host_info['hostname']}.local" + elif ":" in address: + # ipv6 address + address = f"[{address}]" + port = host_info["port"] + return f"http://{address}:{port}/" + + def _gen_estimate_cmd( + self, gc_path: pathlib.Path, est_cfg_path: pathlib.Path + ) -> str: + if self.estimator_path is None or not self.estimator_ready: + raise self.server.error("Klipper Estimator not available") + if not est_cfg_path.exists(): + raise self.server.error( + f"Klipper Estimator config {est_cfg_path.name} does not exist" + ) + cmd = str(self.estimator_path) + escaped_cfg = str(est_cfg_path).replace("\"", "\\\"") + cmd = f"{cmd} --config_file \"{escaped_cfg}\"" + escaped_gc = str(gc_path).replace("\"", "\\\"") + cmd = f"{cmd} estimate -f json \"{escaped_gc}\"" + return cmd + + def _gen_dump_cmd(self) -> str: + if self.estimator_path is None or not self.estimator_ready: + raise self.server.error("Klipper Estimator not available") + return f"{self.estimator_path} {self._gen_url_opts()} dump-config" + + def _gen_url_opts(self) -> str: + url = self._get_moonraker_url() + opts = f"--config_moonraker_url {url}" + auth: Optional[Authorization] + auth = self.server.lookup_component("authorization", None) + api_key = auth.get_api_key() if auth is not None else None + if api_key is not None: + opts = f"{opts} --config_moonraker_api_key {api_key}" + return opts + + async def _dump_estimator_config(self, dest: pathlib.Path) -> Dict[str, Any]: + async with self.cmd_lock: + kconn: KlippyConnection = self.server.lookup_component("klippy_connection") + scmd: ShellCommandFactory = self.server.lookup_component("shell_command") + eventloop = self.server.get_event_loop() + if not kconn.is_ready(): + raise self.server.error( + "Klipper Estimator cannot dump configuration, Klippy not ready", + 504 + ) + dump_cmd = self._gen_dump_cmd() + try: + ret = await scmd.exec_cmd( + dump_cmd, timeout=10., log_complete=False, log_stderr=True + ) + await eventloop.run_in_thread(dest.write_text, ret) + except scmd.error: + raise self.server.error( + "Klipper Estimator dump-config failed", 500 + ) from None + return jsonw.loads(ret) + + async def estimate_file( + self, gc_path: pathlib.Path, est_config: Optional[pathlib.Path] = None + ) -> Dict[str, Any]: + async with self.cmd_lock: + if est_config is None: + # Fall back to estimator config specified in the [analysis] + # section. + est_config = self.estimator_config + if not est_config.is_file(): + raise self.server.error( + f"Estimator config file '{est_config}' does not exist" + ) + if not gc_path.is_file(): + raise self.server.error(f"GCode File '{gc_path}' does not exist") + scmd: ShellCommandFactory = self.server.lookup_component("shell_command") + est_cmd = self._gen_estimate_cmd(gc_path, est_config) + ret = await scmd.exec_cmd(est_cmd, self.estimator_timeout) + data = jsonw.loads(ret) + return data["sequences"][0] + + async def _handle_status_request( + self, web_request: WebRequest + ) -> Dict[str, Any]: + est_exec = "unknown" + if self.estimator_path is not None: + est_exec = self.estimator_path.name + is_default = self.estimator_config == self.default_config + return { + "estimator_executable": est_exec, + "estimator_ready": self.estimator_ready, + "estimator_version": self.estimator_version, + "estimator_config_exists": self.estimator_config.exists(), + "using_default_config": is_default + } + + async def _handle_estimation_request( + self, web_request: WebRequest + ) -> Dict[str, Any]: + gcode_file = web_request.get_str("filename").strip("/") + update_metadata = web_request.get_boolean("update_metadata", False) + estimator_config = web_request.get_str("estimator_config", None) + gc_path = self.file_manger.get_full_path("gcodes", gcode_file) + if not gc_path.is_file(): + raise self.server.error( + f"GCode File '{gcode_file}' does not exit in gcodes path" + ) + est_cfg_path = self.estimator_config + if estimator_config is not None: + estimator_config = estimator_config.strip("/") + est_cfg_path = self.file_manger.get_full_path("config", estimator_config) + if ".." in est_cfg_path.parts: + raise self.server.error( + "Invalid value for param 'estimator_config', '..' segments " + "are not allowed" + ) + ret = await self.estimate_file(gc_path, est_cfg_path) + if update_metadata: + self._update_metadata_est_time(gcode_file, ret) + return ret + + async def _handle_dump_cfg_request( + self, web_request: WebRequest + ) -> Dict[str, Any]: + dest = web_request.get_str("dest_config", None) + root: str | None = None + if dest is not None: + root = "config" + dest = dest.strip("/") + if ".." in pathlib.Path(dest).parts: + raise self.server.error( + "Parameter 'dest_config' may not contain '..' parts" + ) + dest_config = self.file_manger.get_full_path("config", dest) + else: + dest = self.default_config.name + dest_config = self.default_config + result = await self._dump_estimator_config(dest_config) + return { + "dest_root": root, + "dest_config_path": dest, + "klipper_estimator_config": result + } + + +def load_component(config: ConfigHelper) -> GcodeAnalysis: + return GcodeAnalysis(config)