From 08648da7a4890551e5bb8db15a8bb7c857d8a838 Mon Sep 17 00:00:00 2001 From: zkk <1007518571@qq.com> Date: Sat, 10 May 2025 16:36:55 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AE=9E=E7=8E=B0=E4=B8=8B=E4=BD=8D=E6=9C=BA?= =?UTF-8?q?=E7=89=88=E6=9C=AC=E6=A3=80=E6=B5=8B=E5=8D=87=E7=BA=A7=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E4=BB=A5=E5=8F=8A=E4=BC=98=E5=8C=96=E6=96=AD=E7=94=B5?= =?UTF-8?q?=E7=BB=AD=E6=89=93?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # Conflicts: # config/moonraker.conf resolved by f5461563b8a7a5a25470a8763b8513779dbff253 version # scripts/sync_dependencies.py resolved by f5461563b8a7a5a25470a8763b8513779dbff253 version --- config/moonraker.conf | 35 +- moonraker/components/firmware_manager.py | 206 +++ moonraker/components/flash_tool.py | 1201 +++++++++++++++++ moonraker/components/power.py | 7 + .../components/update_manager/app_deploy.py | 3 + moonraker/components/zeroconf.py | 48 +- 6 files changed, 1466 insertions(+), 34 deletions(-) create mode 100644 moonraker/components/firmware_manager.py create mode 100644 moonraker/components/flash_tool.py diff --git a/config/moonraker.conf b/config/moonraker.conf index b0ac533..d0d0f25 100644 --- a/config/moonraker.conf +++ b/config/moonraker.conf @@ -23,6 +23,8 @@ cors_domains: [octoprint_compat] +[firmware_manager] + [history] [power Printer] @@ -58,41 +60,34 @@ pin: ^!gpiochip4/gpio19 debounce_period: .01 minimum_event_time: 0 on_press: - {% set server_info = call_method("server.info") %} - {% set server_info_data = server_info | tojson | fromjson %} - {% if server_info_data['klippy_state'] == "ready" %} - {% set query_objs = {"print_stats": ["state"], "toolhead": ["extruder"], "gcode_move": ["gcode_position"], "virtual_sdcard": ["file_path"]} %} + {% do call_method("printer.gcode.script", script="TURN_OFF_HEATERS") %} + {% do call_method("printer.gcode.script", script="GET_TASKLINE") %} + {% set query_objs = {"print_stats": ["state"], "toolhead": ["extruder"], "virtual_sdcard": ["file_path", "file_position", "file_line"]} %} {% set status = call_method("printer.objects.query", objects=query_objs) %} {% do call_method("printer.emergency_stop") %} {% set data = status | tojson | fromjson %} {% set print_state = data['status']['print_stats']['state'] %} # Judging the printer status {% if print_state | string == 'printing' or print_state | string == 'paused' %} - {% set x_position = data['status']['gcode_move']['gcode_position'][0] | round(4) %} - {% set y_position = data['status']['gcode_move']['gcode_position'][1] | round(4) %} - {% set z_position = data['status']['gcode_move']['gcode_position'][2] | round(4) %} {% set hotend = data['status']['toolhead']['extruder'] %} + {% set position = data['status']['virtual_sdcard']['file_position'] %} + {% set line = data['status']['virtual_sdcard']['file_line'] %} {% set filepath = data['status']['virtual_sdcard']['file_path'] %} - {% set filename = filepath.split('/')[-1] %} - # save position - {% if print_state | string == 'printing' %} - {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_resume_x VALUE=" + x_position | string) %} - {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_resume_y VALUE=" + y_position | string) %} - {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_resume_z VALUE=" + z_position | string) %} - {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_loss_paused VALUE=False") %} - {% elif print_state | string == 'paused' %} - {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_loss_paused VALUE=True") %} - {% endif%} + # save file position and line + {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_resume_position VALUE=" + position | string) %} + {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_resume_line VALUE=" + line | string) %} + {% set script = "SAVE_VARIABLE VARIABLE=power_loss_paused VALUE=" ~ ("False" if print_state | string == 'printing' else "True") %} + {% if print_state | string in ['printing', 'paused'] %} + {% do call_method("printer.gcode.script", script=script) %} + {% endif %} # save extruder {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=power_resume_extruder VALUE=\"'" + hotend | string + "'\"") %} # save file {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=filepath VALUE=\"'" + filepath | string + "'\"") %} - {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=last_file VALUE=\"'" + filename | string + "'\"") %} - # save interrupt + save interrupt {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=was_interrupted VALUE=True") %} {% else %} {% do call_method("printer.gcode.script", script="SAVE_VARIABLE VARIABLE=was_interrupted VALUE=False") %} - {% endif %} {% endif %} # shutdown {% do call_method("machine.shutdown") %} diff --git a/moonraker/components/firmware_manager.py b/moonraker/components/firmware_manager.py new file mode 100644 index 0000000..bbe688d --- /dev/null +++ b/moonraker/components/firmware_manager.py @@ -0,0 +1,206 @@ +import os +import logging +import asyncio +from .flash_tool import FlashTool +from ..confighelper import ConfigHelper +from ..common import KlippyState +from .klippy_apis import KlippyAPI + +from typing import ( + TYPE_CHECKING, + Any, + Dict, +) + +if TYPE_CHECKING: + from .machine import Machine + from .klippy_connection import KlippyConnection as Klippy + +HOME = os.path.expanduser("~") +KLIPPER_DIR = os.path.join(HOME, "klipper/firmware") +MAIN_DEV = "/dev/ttyACM0" +SYSTEM_PYTHON = "python3" + +class FirmwareUpdate: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.mcu_info: Dict[str, Dict[str, Any]] = {} + self._service_info: Dict[str, Any] = {} + self.min_version: str = "" + self.klipper_version: str = "" + self.current_progress = 0 + self.updating = False + self.need_check_update = True + self.server.register_event_handler( + "server:klippy_started", self._on_klippy_startup) + + @property + def klippy_apis(self) -> KlippyAPI: + return self.server.lookup_component("klippy_apis") + + def is_updating(self) -> bool: + return self.updating + + async def _on_klippy_startup(self, state: KlippyState) -> None: + if not self.need_check_update or not self._is_firmware_dir_exists(): + return + try: + await self.build_mcu_info() + logging.info(f"{self.mcu_info}") + except Exception as e: + logging.exception(f"An error occurred during the building mcu info process: {e}") + for mcu_data in self.mcu_info.values(): + if mcu_data.get('need_update', False): + await self.start_update() + break + + async def _do_klipper_action(self, action: str) -> None: + try: + machine: Machine = self.server.lookup_component("machine") + await machine.do_service_action(action, "klipper") + logging.info(f"Klipper service {action}.") + except self.server.error: + pass + + def _is_firmware_dir_exists(self): + return os.path.exists(KLIPPER_DIR) + + async def start_update(self): + try: + self.updating = True + await self._do_klipper_action("stop") + await self.upgrade_needed_tool_mcus() + await self.upgrade_mcu() + await self._do_klipper_action("start") + self.updating = False + except Exception as e: + logging.exception(f"An error occurred during the update process: {e}") + self.updating = False + + async def build_mcu_info(self) -> None: + printer_info: Dict[str, Any] = {} + cfg_status: Dict[str, Any] = {} + try: + printer_info = await self.klippy_apis.get_klippy_info() + cfg_status = await self.klippy_apis.query_objects({'configfile': None}) + except self.server.error: + logging.exception("PanelDue initialization request failed") + config = cfg_status.get('configfile', {}).get('config', {}) + self.klipper_version = printer_info.get("software_version", "").split('-')[0] + try: + self._build_basic_mcu_info(config) + await self._update_mcu_versions() + self._check_mcu_update_needed() + + except Exception as e: + logging.exception(f"An error occurred while building MCU info: {e}") + + def _build_basic_mcu_info(self, config: Dict[str, Any]) -> None: + for mcu, value in config.items(): + if mcu.startswith("mcu") and "canbus_uuid" in value: + self.mcu_info[mcu] = {"canbus_uuid": value["canbus_uuid"]} + + async def _update_mcu_versions(self) -> None: + for mcu in self.mcu_info: + try: + response = await self.klippy_apis.query_objects({mcu: None}) + mcu_data = response.get(mcu, {}) + mcu_version: str = mcu_data.get('mcu_version', '') + if mcu == "mcu": + self.min_version = mcu_data.get('min_firmware_version', "") + if mcu_version: + self.need_check_update = False + short_version: str = mcu_version.split('-')[0] + if short_version.lower().startswith('v'): + short_version = short_version[1:] + self.mcu_info[mcu]['mcu_version'] = short_version + except Exception as e: + logging.error(f"Error querying {mcu}: {e}") + + def _compare_versions(self, version1, version2): + v1_parts = [int(part) for part in version1.split('.') if part] + v2_parts = [int(part) for part in version2.split('.') if part] + + max_length = max(len(v1_parts), len(v2_parts)) + v1_parts.extend([0] * (max_length - len(v1_parts))) + v2_parts.extend([0] * (max_length - len(v2_parts))) + + for i in range(max_length): + if v1_parts[i] < v2_parts[i]: + return -1 + elif v1_parts[i] > v2_parts[i]: + return 1 + return 0 + + def _check_mcu_update_needed(self) -> None: + if self.klipper_version.lower().startswith('v'): + self.klipper_version = self.klipper_version[1:] + logging.info(f"min version: {self.min_version}") + logging.info(f"klipper version: {self.klipper_version}") + for mcu in self.mcu_info: + mcu_version = self.mcu_info[mcu].get('mcu_version', "") + + mcu_vs_min = self._compare_versions(mcu_version, self.min_version) + mcu_vs_klipper = self._compare_versions(mcu_version, self.klipper_version) + + if mcu_vs_min < 0 or mcu_vs_klipper > 0: + self.mcu_info[mcu]['need_update'] = True + else: + self.mcu_info[mcu]['need_update'] = False + + async def upgrade_needed_tool_mcus(self): + firmware_mapping = { + "mcu L_tool": os.path.join(KLIPPER_DIR, "F072_L.bin"), + "mcu R_tool": os.path.join(KLIPPER_DIR, "F072_R.bin"), + "mcu tool": os.path.join(KLIPPER_DIR, "F072_L.bin") + } + for mcu_name, mcu_data in self.mcu_info.items(): + if mcu_data.get('need_update', False) and mcu_name != "mcu": + firmware_path = firmware_mapping.get(mcu_name) + if not firmware_path: + logging.warning(f"No firmware specified for {mcu_name}, skipping upgrade.") + continue + try: + flash_tool = FlashTool( + self.server, + uuid = mcu_data['canbus_uuid'], + firmware = firmware_path + ) + exit_code = await flash_tool.run() + await asyncio.sleep(1) + if exit_code == 0: + logging.info(f"Upgrade operation for {mcu_name} succeeded, exit code: {exit_code}") + else: + logging.error(f"Upgrade operation for {mcu_name} failed, exit code: {exit_code}") + except Exception as e: + logging.exception(f"An exception occurred during the upgrade process for {mcu_name}: {e}") + + async def upgrade_mcu(self): + for mcu_name, mcu_data in self.mcu_info.items(): + if mcu_name == "mcu" and mcu_data.get('need_update', False): + try: + flash_tool = FlashTool( + self.server, + uuid=mcu_data['canbus_uuid'], + request_bootloader=True + ) + exit_code = await flash_tool.run() + if exit_code != 0: + logging.error(f"Failed to request bootloader for {mcu_name}, exit code: {exit_code}") + continue + await asyncio.sleep(2) + flash_tool = FlashTool( + self.server, + device=MAIN_DEV, + firmware=os.path.join(KLIPPER_DIR, "F446.bin") + ) + exit_code = await flash_tool.run() + if exit_code == 0: + logging.info(f"Upgrade operation for {mcu_name} succeeded, exit code: {exit_code}") + else: + logging.error(f"Upgrade operation for {mcu_name} failed, exit code: {exit_code}") + except Exception as e: + logging.exception(f"An exception occurred during the upgrade process for {mcu_name}: {e}") + +def load_component(config: ConfigHelper) -> FirmwareUpdate: + return FirmwareUpdate(config) diff --git a/moonraker/components/flash_tool.py b/moonraker/components/flash_tool.py new file mode 100644 index 0000000..5931366 --- /dev/null +++ b/moonraker/components/flash_tool.py @@ -0,0 +1,1201 @@ +#!/usr/bin/env python3 +# Script to upload software via Katapult +# +# Copyright (C) 2022 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. +from __future__ import annotations +import sys +import os +import termios +import fcntl +import zlib +import json +import asyncio +import socket +import struct +import logging +import errno +import argparse +import hashlib +import pathlib +import shutil +import shlex +import contextlib +from typing import TYPE_CHECKING, Dict, List, Optional, Union, Any + +if TYPE_CHECKING: + from ..server import Server + +HAS_SERIAL = True +try: + from serial import Serial, SerialException +except ModuleNotFoundError: + HAS_SERIAL = False + SerialException = Exception + +def output_line(msg: str) -> None: + sys.stdout.write(msg + "\n") + sys.stdout.flush() + logging.info(f"{msg}\n") + +def output(msg: str) -> None: + sys.stdout.write(msg) + sys.stdout.flush() + +# Standard crc16 ccitt, take from msgproto.py in Klipper +def crc16_ccitt(buf: Union[bytes, bytearray]) -> int: + crc = 0xffff + for data in buf: + data ^= crc & 0xff + data ^= (data & 0x0f) << 4 + crc = ((data << 8) | (crc >> 8)) ^ (data >> 4) ^ (data << 3) + return crc & 0xFFFF + + +logging.basicConfig(level=logging.INFO) +CAN_FMT = " Dict[str, Any]: + usb_info: Dict[str, Any] = {} + id_path = usb_path.joinpath("idVendor") + prod_id_path = usb_path.joinpath("idProduct") + mfr_path = usb_path.joinpath("manufacturer") + prod_path = usb_path.joinpath("product") + serial_path = usb_path.joinpath("serial") + usb_info["usb_id"] = "" + if id_path.is_file() and prod_id_path.is_file(): + vid = id_path.read_text().strip().lower() + pid = prod_id_path.read_text().strip().lower() + usb_info["usb_id"] = f"{vid}:{pid}" + usb_info["manufacturer"] = "unknown" + usb_info["product"] = "unknown" + usb_info["serial_number"] = "" + if mfr_path.is_file(): + usb_info["manufacturer"] = mfr_path.read_text().strip().lower() + if prod_path.is_file(): + usb_info["product"] = prod_path.read_text().strip().lower() + if serial_path.is_file(): + usb_info["serial_number"] = serial_path.read_text().strip().lower() + return usb_info + +def get_usb_path(device: pathlib.Path) -> Optional[pathlib.Path]: + device_path = device.resolve() + if not device_path.exists(): + return None + sys_dev_path = pathlib.Path("/sys/class/tty").joinpath(device_path.name) + if not sys_dev_path.exists(): + return None + sys_dev_path = sys_dev_path.resolve() + for usb_path in sys_dev_path.parents: + dnum_file = usb_path.joinpath("devnum") + bnum_file = usb_path.joinpath("busnum") + if dnum_file.is_file() and bnum_file.is_file(): + return usb_path + return None + +def get_stable_usb_symlink(device: pathlib.Path) -> pathlib.Path: + device_path = device.resolve() + ser_by_path_dir = pathlib.Path("/dev/serial/by-path") + try: + if ser_by_path_dir.exists(): + for item in ser_by_path_dir.iterdir(): + if item.samefile(device_path): + return item + except OSError: + pass + return device_path + + +# Python Port of fasthash6 +# Host URL: http://github.com/ztanml/fast-hash +# Original Copyright: +# Copyright (C) 2012 Zilong Tan (eric.zltan@gmail.com) +# +# MIT License +# +# Permission is hereby granted, free of charge, to any person +# obtaining a copy of this software and associated documentation +# files (the "Software"), to deal in the Software without +# restriction, including without limitation the rights to use, copy, +# modify, merge, publish, distribute, sublicense, and/or sell copies +# of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be +# included in all copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, +# EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF +# MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND +# NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS +# BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN +# ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN +# CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +MASK_64 = 0xFFFFFFFFFFFFFFFF + +def mix(val: int) -> int: + val ^= val >> 23 + val = (val * 0x2127599bf4325c37) & MASK_64 + val ^= val >> 47 + return val + +def fasthash64(buffer: bytes, seed: int) -> int: + m = 0x880355f21e6d1965 + h = seed ^ ((len(buffer) * m) & MASK_64) + long_count = len(buffer) // 8 + remaining: bytes = buffer + if long_count: + byte_count = long_count * 8 + buf = struct.unpack(f"<{long_count}Q", buffer[:byte_count]) + for v in buf: + h ^= mix(v) + h = (h * m) & MASK_64 + remaining = buffer[byte_count:] + if remaining: + v = 0 + print(remaining) + for i in range(len(remaining), 0, -1): + v ^= remaining[i -1] << (8 * (i - 1)) + h ^= mix(v) + h = (h * m) & MASK_64 + return mix(h) + +def convert_usbsn_to_uuid(serial_number: str) -> int: + seed = 0xA16231A7 + chip_id = bytes.fromhex(serial_number) + result = fasthash64(chip_id, seed) + swapped = 0 + # The UUID returned by Klipper firmware is byteswapped + for i in range(6): + swapped |= ((result >> (i * 8)) & 0xFF) << ((5 - i) * 8) + return swapped + +class CanFlasher: + def __init__( + self, + server: Server, + node: CanNode, + fw_file: pathlib.Path + ) -> None: + self.server = server + self.node = node + self.firmware_path = fw_file + self.fw_sha = hashlib.sha1() + self.primed = False + self.file_size = 0 + self.block_size = 64 + self.block_count = 0 + self.app_start_addr = 0 + self.full_complete = False + self.klipper_dict: Optional[Dict[str, Any]] = None + self._check_binary() + + def notify_update_response( + self, resp: Union[str, bytes], is_complete: bool = False + ) -> None: + if self.firmware_path is None: + return + resp = resp.strip() + if isinstance(resp, bytes): + resp = resp.decode() + done = is_complete + done &= self.full_complete + notification = { + 'message': resp, + 'application': "firmware", + 'complete': "done"} + self.server.send_event( + "update_manager:update_response", notification) + + def _check_binary(self) -> None: + """ + Extract klipper.dict from binary + """ + fw_name = self.firmware_path.name.lower() + if fw_name != "klipper.bin" or not self.firmware_path.is_file(): + return + bin_data = self.firmware_path.read_bytes() + klipper_dict: Dict[str, Any] = {} + for idx in range(len(bin_data)): + try: + uncmp_data = zlib.decompress(bin_data[idx:]) + klipper_dict = json.loads(uncmp_data) + except (zlib.error, json.JSONDecodeError): + continue + if klipper_dict.get("app") == "Klipper": + break + if klipper_dict: + self.klipper_dict = klipper_dict + ver = klipper_dict.get("version", "") + bin_mcu = self.klipper_dict.get("config", {}).get("MCU", "") + output_line( + f"Detected Klipper binary version {ver}, MCU: {bin_mcu}" + ) + + def _build_command(self, cmd: int, payload: bytes) -> bytearray: + word_cnt = (len(payload) // 4) & 0xFF + out_cmd = bytearray(CMD_HEADER) + out_cmd.append(cmd) + out_cmd.append(word_cnt) + if payload: + out_cmd.extend(payload) + crc = crc16_ccitt(out_cmd[2:]) + out_cmd.extend(struct.pack(" None: + # Prime with an invalid command. This will generate an error + # and force double buffered USB devices to respond after the + # first command is sent. + msg = self._build_command(0x90, b"") + self.node.write(msg) + self.primed = True + + async def connect_btl(self) -> None: + output_line("Attempting to connect to bootloader") + ret = await self.send_command('CONNECT') + pinfo = ret[:12] + mcu_info = ret[12:] + ver_bytes: bytes + ver_bytes, start_addr, self.block_size = struct.unpack("<4sII", pinfo) + self.app_start_addr = start_addr + self.software_version = "?" + self.proto_version = tuple([v for v in reversed(ver_bytes[:3])]) + proto_version_str = ".".join([str(v) for v in self.proto_version]) + if self.block_size not in [64, 128, 256, 512]: + raise FlashError("Invalid Block Size: %d" % (self.block_size,)) + mcu_info = mcu_info.rstrip(b"\x00") + if self.proto_version >= (1, 1, 0): + build_info = mcu_info.split(b"\x00", maxsplit=1) + mcu_type = build_info[0].decode() + if len(build_info) == 2: + self.software_version = build_info[1].decode() + else: + output_line("Katapult build not reporting software version!") + else: + mcu_type = mcu_info.decode() + output_line( + f"Katapult Connected\n" + f"Software Version: {self.software_version}\n" + f"Protocol Version: {proto_version_str}\n" + f"Block Size: {self.block_size} bytes\n" + f"Application Start: 0x{self.app_start_addr:4X}\n" + f"MCU type: {mcu_type}" + ) + if self.klipper_dict is not None: + bin_mcu = self.klipper_dict.get("config", {}).get("MCU", "") + if bin_mcu and bin_mcu != mcu_type: + raise FlashError( + "MCU returned by Katapult does not match MCU " + "identified in klipper.bin.\n" + f"Katapult MCU: {mcu_type}\n" + f"Klipper Binary MCU: {bin_mcu}" + ) + + async def verify_canbus_uuid(self, uuid): + output_line("Verifying canbus connection") + ret = await self.send_command('GET_CANBUS_ID') + mcu_uuid = sum([v << ((5 - i) * 8) for i, v in enumerate(ret[:6])]) + if mcu_uuid != uuid: + raise FlashError("UUID mismatch (%s vs %s)" % (uuid, mcu_uuid)) + + async def send_command( + self, + cmdname: str, + payload: bytes = b"", + tries: int = 5 + ) -> bytearray: + cmd = BOOTLOADER_CMDS[cmdname] + out_cmd = self._build_command(cmd, payload) + last_err = Exception() + while tries: + data = bytearray() + recd_len = 0 + try: + self.node.write(out_cmd) + read_done = False + while not read_done: + ret = await self.node.readuntil(CMD_TRAILER) + data.extend(ret) + while len(data) > 7: + if data[:2] != CMD_HEADER: + data = data[1:] + continue + recd_len = data[3] * 4 + read_done = len(data) == recd_len + 8 + break + if self.primed and read_done: + recd_len = 0 + data.clear() + self.primed = False + read_done = False + except asyncio.CancelledError: + raise + except asyncio.TimeoutError: + logging.info( + f"Response for command {cmdname} timed out, " + f"{tries - 1} tries remaining" + ) + except Exception as e: + if type(e) is type(last_err) and e.args == last_err.args: + last_err = e + logging.exception("Device Read Error") + else: + trailer = data[-2:] + recd_crc, = struct.unpack("= last_percent + 2: + last_percent += 2. + output("#") + totals = ( + f"{uploaded // 1024} KiB / " + f"{float(self.file_size) // 1024} KiB" + ) + if pct == 100: + self.full_complete = True + self.notify_update_response( + f"Updataing {self._get_mcu_from_path()}: {totals} [{pct}%]") + resp = await self.send_command('SEND_EOF') + page_count, = struct.unpack("= last_percent + 2: + last_percent += 2 + output("#") + totals = ( + f"{(i * self.block_size) // 1024} KiB / " + f"{float(self.file_size) // 1024} KiB" + ) + self.notify_update_response( + f"Verify firmware: {totals} [{pct}%]") + ver_hex = ver_sha.hexdigest().upper() + fw_hex = self.fw_sha.hexdigest().upper() + if ver_hex != fw_hex: + raise FlashError("Checksum mismatch: Expected %s, Received %s" + % (fw_hex, ver_hex)) + output_line("]\n\nVerification Complete: SHA = %s" % (ver_hex)) + + async def finish(self): + await self.send_command("COMPLETE") + + +class CanNode: + def __init__(self, node_id: int, cansocket: CanSocket | SerialSocket) -> None: + self.node_id = node_id + self._reader = asyncio.StreamReader(CAN_READER_LIMIT) + self._cansocket = cansocket + + async def read( + self, n: int = -1, timeout: Optional[float] = 2. + ) -> bytes: + return await asyncio.wait_for(self._reader.read(n), timeout) + + async def readexactly( + self, n: int, timeout: Optional[float] = 2. + ) -> bytes: + return await asyncio.wait_for(self._reader.readexactly(n), timeout) + + async def readuntil( + self, sep: bytes = b"\x03", timeout: Optional[float] = 2. + ) -> bytes: + return await asyncio.wait_for(self._reader.readuntil(sep), timeout) + + def write(self, payload: Union[bytes, bytearray]) -> None: + if isinstance(payload, bytearray): + payload = bytes(payload) + self._cansocket.send(self.node_id, payload) + + async def write_with_response( + self, + payload: Union[bytearray, bytes], + resp_length: int, + timeout: Optional[float] = 2. + ) -> bytes: + self.write(payload) + return await self.readexactly(resp_length, timeout) + + def feed_data(self, data: bytes) -> None: + self._reader.feed_data(data) + + def close(self) -> None: + self._reader.feed_eof() + +class BaseSocket: + def __init__(self, args: argparse.Namespace) -> None: + self._loop = asyncio.get_running_loop() + self._args = args + self._fw_path = pathlib.Path(args.firmware).expanduser().resolve() + + @property + def is_flash_req(self) -> bool: + return not ( + self.is_bootloader_req or self.is_status_req or self.is_query + ) + + @property + def is_bootloader_req(self) -> bool: + return self._args.request_bootloader + + @property + def is_status_req(self) -> bool: + return self._args.status + + @property + def is_query(self) -> bool: + return self._args.query + + @property + def is_usb_can_bridge(self) -> bool: + return False + + @property + def usb_serial_path(self) -> pathlib.Path: + raise NotImplementedError() + + def _check_firmware(self) -> None: + if self.is_flash_req and not self._fw_path.is_file(): + raise FlashError("Invalid firmware path '%s'" % (self._fw_path)) + + async def run(self) -> None: + raise NotImplementedError() + + def close(self) -> None: + raise NotImplementedError() + +class CanSocket(BaseSocket): + def __init__(self, server: Server, args: argparse.Namespace) -> None: + super().__init__(args) + self._uuid = 0 + self.server = server + self._can_interface = args.interface + self._can_bridge_path: pathlib.Path | None = None + self._can_bridge_serial_path: pathlib.Path | None = None + if not self.is_query: + if args.uuid is None: + raise FlashError( + "The 'uuid' option must be specified to flash a CAN device" + ) + else: + intf = self._can_interface + self._uuid = int(args.uuid, 16) + self._search_canbus_bridge() + output_line(f"Connecting to CAN UUID {args.uuid} on interface {intf}") + self.cansock = socket.socket(socket.PF_CAN, socket.SOCK_RAW, + socket.CAN_RAW) + self.admin_node = CanNode(CANBUS_ID_ADMIN, self) + self.nodes: Dict[int, CanNode] = { + CANBUS_ID_ADMIN_RESP: self.admin_node + } + + self.input_buffer = b"" + self.output_packets: List[bytes] = [] + self.input_busy = False + self.output_busy = False + self.closed = True + + @property + def is_usb_can_bridge(self) -> bool: + return self._can_bridge_path is not None + + @property + def usb_serial_path(self) -> pathlib.Path: + if self._can_bridge_serial_path is not None: + return self._can_bridge_serial_path + # find tty path + assert self._can_bridge_path is not None + dir_name = self._can_bridge_path.name + ttys = list(self._can_bridge_path.glob(f"{dir_name}:*/tty/tty*")) + if not ttys: + raise FlashError("Failed to locate serial tty path") + tty_path = pathlib.Path("/dev").joinpath(ttys[0].name) + if not tty_path.exists(): + raise FlashError("Detected tty path does not exist") + self._can_bridge_serial_path = tty_path + return tty_path + + def _handle_can_response(self) -> None: + try: + data = self.cansock.recv(4096) + except socket.error as e: + # If bad file descriptor allow connection to be + # closed by the data check + if e.errno == errno.EBADF: + logging.exception("Can Socket Read Error, closing") + data = b'' + else: + return + if not data: + # socket closed + self.close() + return + self.input_buffer += data + if self.input_busy: + return + self.input_busy = True + while len(self.input_buffer) >= 16: + packet = self.input_buffer[:16] + self._process_packet(packet) + self.input_buffer = self.input_buffer[16:] + self.input_busy = False + + def _process_packet(self, packet: bytes) -> None: + can_id, length, data = struct.unpack(CAN_FMT, packet) + can_id &= socket.CAN_EFF_MASK + payload = data[:length] + node = self.nodes.get(can_id) + if node is not None: + node.feed_data(payload) + + def send(self, can_id: int, payload: bytes = b"") -> None: + if can_id > 0x7FF: + can_id |= socket.CAN_EFF_FLAG + if not payload: + packet = struct.pack(CAN_FMT, can_id, 0, b"") + self.output_packets.append(packet) + else: + while payload: + length = min(len(payload), 8) + pkt_data = payload[:length] + payload = payload[length:] + packet = struct.pack( + CAN_FMT, can_id, length, pkt_data) + self.output_packets.append(packet) + if self.output_busy: + return + self.output_busy = True + asyncio.create_task(self._do_can_send()) + + async def _do_can_send(self): + while self.output_packets: + packet = self.output_packets.pop(0) + try: + await self._loop.sock_sendall(self.cansock, packet) + except socket.error: + logging.info("Socket Write Error, closing") + self.close() + break + self.output_busy = False + + def _jump_to_bootloader(self, uuid: int): + output_line("Sending bootloader jump command...") + plist = [(uuid >> ((5 - i) * 8)) & 0xFF for i in range(6)] + plist.insert(0, KLIPPER_REBOOT_CMD) + self.send(KLIPPER_ADMIN_ID, bytes(plist)) + + async def _query_uuids(self) -> List[int]: + output_line("Checking for Katapult nodes...") + payload = bytes([CANBUS_CMD_QUERY_UNASSIGNED]) + self.admin_node.write(payload) + curtime = self._loop.time() + endtime = curtime + 2. + self.uuids: List[int] = [] + while curtime < endtime: + timeout = max(.1, endtime - curtime) + try: + resp = await self.admin_node.read(8, timeout) + except asyncio.TimeoutError: + continue + finally: + curtime = self._loop.time() + if len(resp) < 7 or resp[0] != CANBUS_RESP_NEED_NODEID: + continue + app_names = { + KLIPPER_SET_NODE_CMD: "Klipper", + CANBUS_CMD_SET_NODEID: "Katapult" + } + app = "Unknown" + if len(resp) > 7: + app = app_names.get(resp[7], "Unknown") + data = resp[1:7] + output_line(f"Detected UUID: {data.hex()}, Application: {app}") + uuid = sum([v << ((5 - i) * 8) for i, v in enumerate(data)]) + if uuid not in self.uuids and app == "Katapult": + self.uuids.append(uuid) + return self.uuids + + def _reset_nodes(self) -> None: + output_line("Resetting all bootloader node IDs...") + payload = bytes([CANBUS_CMD_CLEAR_NODE_ID]) + self.admin_node.write(payload) + + def _set_node_id(self, uuid: int) -> CanNode: + # Convert ID to a list + plist = [(uuid >> ((5 - i) * 8)) & 0xFF for i in range(6)] + plist.insert(0, CANBUS_CMD_SET_NODEID) + node_id = len(self.nodes) + CANBUS_NODEID_OFFSET + plist.append(node_id) + payload = bytes(plist) + self.admin_node.write(payload) + decoded_id = node_id * 2 + 0x100 + node = CanNode(decoded_id, self) + self.nodes[decoded_id + 1] = node + return node + + def _search_canbus_bridge(self) -> None: + can_intf = self._can_interface.lower() + sysfs_usb_path = pathlib.Path("/sys/bus/usb/devices") + if not sysfs_usb_path.is_dir(): + return + for item in sysfs_usb_path.iterdir(): + if not item.joinpath("bDeviceClass").is_file(): + continue + usb_info = get_usb_info(item) + if ( + usb_info["usb_id"] != GS_CAN_USB_ID or + usb_info["manufacturer"] != "klipper" + ): + continue + if not list(item.glob(f"{item.name}:*/net/{can_intf}")): + continue + # Klipper GS USB Device matches + serial_no = usb_info["serial_number"] + logging.info( + f"Found Klipper USB-CAN bridge on {can_intf}, serial {serial_no}" + ) + if serial_no: + try: + det_uuid = convert_usbsn_to_uuid(serial_no) + except Exception: + output_line( + f"Failed to convert can bridge serial number {serial_no} " + f"to a uuid for device {item.name}" + ) + logging.exception("UUID conversion failed") + return + logging.info( + f"Detected UUID: {det_uuid:x}, provided UUID: {self._uuid:x}" + ) + if det_uuid == self._uuid: + self._can_bridge_path = item + output_line(f"Canbus Bridge detected at {item}") + break + + async def _wait_canbridge_reset(self) -> None: + if self._can_bridge_path is None: + return + output("Waiting for USB Reconnect.") + for _ in range(8): + await asyncio.sleep(.5) + output(".") + usb_info = get_usb_info(self._can_bridge_path) + mfr = usb_info.get("manufacturer") + usb_id = usb_info.get("usb_id", "") + product = usb_info.get("product") + if usb_id and usb_id != GS_CAN_USB_ID: + await asyncio.sleep(.5) + output_line("done") + output_line(f"Detected new USB Device: {usb_id} {mfr} {product}") + if mfr == "katapult" or usb_id == KATAPULT_USB_ID: + serial_path = self.usb_serial_path + output_line(f"Katapult detected at serial port {serial_path}") + else: + # Device is not Katapult, force exit + self._args.request_bootloader = True + output_line("Device is not Katapult, exiting...") + break + else: + output_line("timed out") + + async def run(self) -> None: + self._check_firmware() + try: + self.cansock.bind((self._can_interface,)) + except Exception: + raise FlashError(f"Unable to bind socket to {self._can_interface}") + self.closed = False + self.cansock.setblocking(False) + self._loop.add_reader( + self.cansock.fileno(), self._handle_can_response) + if self.is_flash_req or self.is_bootloader_req: + self._jump_to_bootloader(self._uuid) + if self.is_usb_can_bridge: + await self._wait_canbridge_reset() + return + else: + await asyncio.sleep(1.0) + if self.is_bootloader_req: + return + self._reset_nodes() + await asyncio.sleep(.5) + if self.is_query: + await self._query_uuids() + return + node = self._set_node_id(self._uuid) + flasher = CanFlasher(self.server, node, self._fw_path) + await asyncio.sleep(.5) + try: + await flasher.connect_btl() + await flasher.verify_canbus_uuid(self._uuid) + if not self.is_status_req: + await flasher.send_file() + await flasher.verify_file() + finally: + # always attempt to send the complete command. If + # there is an error it will exit the bootloader + # unless comms were broken + if self.is_flash_req: + await flasher.finish() + + def close(self): + if self.closed: + return + self.closed = True + for node in self.nodes.values(): + node.close() + self._loop.remove_reader(self.cansock.fileno()) + self.cansock.close() + +class SerialSocket(BaseSocket): + def __init__(self, server: Server, args: argparse.Namespace) -> None: + super().__init__(args) + self.server = server + self._device = args.device + self._baud = args.baud + if not HAS_SERIAL: + ser_inst_cmd = "pip3 install serial" + if shutil.which("apt") is not None: + ser_inst_cmd = "sudo apt install python3-serial" + raise FlashError( + "The pyserial python package was not found. To install " + "run the following command in a terminal: \n\n" + f" {ser_inst_cmd}\n\n" + ) + if self._device is None: + raise FlashError( + "The 'device' option must be specified to flash a device" + ) + output_line(f"Connecting to Serial Device {self._device}, baud {self._baud}") + self.serial: Optional[Serial] = None + self.node = CanNode(0, self) + + @property + def is_query(self) -> bool: + return False + + @property + def usb_serial_path(self) -> pathlib.Path: + return pathlib.Path(self._device) + + def _handle_response(self) -> None: + assert self.serial is not None + try: + data = self.serial.read(4096) + except SerialException: + logging.exception("Error on serial read") + self.close() + else: + self.node.feed_data(data) + + def send(self, can_id: int, payload: bytes = b"") -> None: + assert self.serial is not None + try: + self.serial.write(payload) + except SerialException: + logging.exception("Error on serial write") + self.close() + + async def _lookup_proc_name(self, process_id: str) -> str: + has_sysctl = shutil.which("systemctl") is not None + if has_sysctl: + cmd = shlex.split(f"systemctl status {process_id}") + proc = await asyncio.create_subprocess_exec( + *cmd, + stdout=asyncio.subprocess.PIPE, + stderr=asyncio.subprocess.PIPE + ) + stdout, _ = await proc.communicate() + resp = stdout.strip().decode(errors="ignore") + if resp: + unit = resp.split(maxsplit=2) + if len(unit) == 3: + return f"Systemd Unit Name: {unit[1]}" + cmdline_file = pathlib.Path(f"/proc/{process_id}/cmdline") + if cmdline_file.exists(): + res = cmdline_file.read_text().replace("\x00", " ").strip() + return f"Command Line: {res}" + exe_file = pathlib.Path(f"/proc/{process_id}/exe") + if exe_file.exists(): + return f"Executable: {exe_file.resolve()})" + return "Name Unknown" + + async def validate_device(self, dev_strpath: str) -> None: + dev_path = pathlib.Path(dev_strpath) + if not dev_path.exists(): + raise FlashError(f"No Serial Device found at {dev_path}") + try: + dev_st = dev_path.stat() + except PermissionError as e: + raise FlashError(f"No permission to access device {dev_path}") from e + dev_id = (dev_st.st_dev, dev_st.st_ino) + for fd_dir in pathlib.Path("/proc").glob("*/fd"): + pid = fd_dir.parent.name + if not pid.isdigit(): + continue + with contextlib.suppress(OSError): + for item in fd_dir.iterdir(): + try: + item_st = item.stat() + except OSError: + continue + item_id = (item_st.st_dev, item_st.st_ino) + if item_id == dev_id: + proc_name = await self._lookup_proc_name(pid) + output_line( + f"Serial device {dev_path} in use by another program.\n" + f"Process ID: {pid}\n" + f"Process {proc_name}" + ) + raise FlashError(f"Serial device {dev_path} in use") + + async def _request_usb_bootloader(self, device: pathlib.Path) -> pathlib.Path: + output_line(f"Requesting USB bootloader for {device}...") + usb_dev_path = get_usb_path(device) + if usb_dev_path is None: + output_line(f"Device path {device} is not a usb device") + return device + stable_path = get_stable_usb_symlink(device) + usb_info = get_usb_info(usb_dev_path) + start_usb_id = usb_info["usb_id"] + fd: Optional[int] = None + with contextlib.suppress(OSError): + fd = os.open(str(device), os.O_RDWR | os.O_NOCTTY | os.O_NONBLOCK) + dtr_data = struct.pack('I', termios.TIOCM_DTR) + fcntl.ioctl(fd, termios.TIOCMBIS, dtr_data) + t = termios.tcgetattr(fd) + t[4] = t[5] = termios.B1200 + termios.tcsetattr(fd, termios.TCSANOW, t) + fcntl.ioctl(fd, termios.TIOCMBIC, dtr_data) + if fd is not None: + os.close(fd) + output("Waiting for USB Reconnect.") + for _ in range(8): + await asyncio.sleep(.5) + output(".") + usb_info = get_usb_info(usb_dev_path) + mfr = usb_info.get("manufacturer") + product = usb_info.get("product") + usb_id = usb_info.get("usb_id", "") + if usb_id and usb_id != start_usb_id: + await asyncio.sleep(.5) + output_line("done") + output_line(f"Detected new USB Device: {usb_id} {mfr} {product}") + if mfr == "katapult" or usb_id == KATAPULT_USB_ID: + # prefer path resolved from sysfs usb path + path_match = list( + usb_dev_path.glob(f"{usb_dev_path.name}:*/tty/tty*") + ) + if len(path_match) == 1: + det_path = pathlib.Path(f"/dev/{path_match[0].name}") + if det_path.exists(): + stable_path = det_path + output_line(f"Katapult detected on {stable_path}") + else: + # Device is not Katapult, force exit + self._args.request_bootloader = True + output_line("Device is not Katapult, exiting...") + break + else: + output_line("timed out") + return stable_path + + async def _request_serial_bootloader(self, device: str, baud: int) -> None: + output_line(f"Requesting serial bootloader for device {device}...") + self.serial = self._open_device(device, baud) + self.send(0, SERIAL_BL_REQ) + await asyncio.sleep(1.) + if self.serial is not None: + self.close() + + def _open_device(self, device: str, baud: int) -> Serial: + try: + serial_dev = Serial( # type: ignore + baudrate=baud, timeout=0, exclusive=True + ) + serial_dev.port = device + serial_dev.open() + except (OSError, IOError, SerialException) as e: + raise FlashError("Unable to open serial port: %s" % (e,)) + return serial_dev + + def _has_double_buffering(self, product: str) -> bool: + if product.startswith("stm32"): + variant = product[5:7] + return variant not in ("f2", "f4", "h7") + return False + + async def run(self) -> None: + self._check_firmware() + device = self._device + await self.validate_device(device) + dev_path = pathlib.Path(device) + usb_dev_path = get_usb_path(dev_path) + dev_info: Dict[str, Any] = {} + if usb_dev_path is not None: + dev_info = get_usb_info(usb_dev_path) + usb_id = dev_info.get("usb_id") + usb_mfr = dev_info.get("manufacturer") + usb_prod: str = dev_info.get("product", "unknown") + if usb_mfr == "klipper" or usb_id == KLIPPER_USB_ID: + # Request usb bootloader, wait for katapult + output_line("Detected USB device running Klipper") + new_dpath = await self._request_usb_bootloader(dev_path) + device = str(new_dpath) + if self.is_bootloader_req: + return + elif usb_mfr == "katapult" or usb_id == KATAPULT_USB_ID: + output_line("Detected USB device running Katapult") + if self.is_bootloader_req: + return + elif self.is_bootloader_req: + # Request serial bootloader and exit + await self._request_serial_bootloader(device, self._baud) + return + else: + usb_prod = "" + self.serial = self._open_device(device, self._baud) + self._loop.add_reader(self.serial.fileno(), self._handle_response) + flasher = CanFlasher(self.server, self.node, self._fw_path) + try: + if self._has_double_buffering(usb_prod): + # Prime the USB Connection with a dummy command. This is + # necessary to get STM32 devices with usbfs double buffering + # to respond immediately to the connect command. + flasher.prime() + await flasher.connect_btl() + if not self.is_status_req: + await flasher.send_file() + await flasher.verify_file() + finally: + # always attempt to send the complete command. If + # there is an error it will exit the bootloader + # unless comms were broken + if self.is_flash_req: + await flasher.finish() + + def close(self): + if self.serial is None: + return + self._loop.remove_reader(self.serial.fileno()) + self.serial.close() + self.serial = None + +class FlashTool: + def __init__( + self, + server: Server, + device=None, + baud=250000, + interface="can0", + firmware="~/klipper/custom_out/klipper.bin", + uuid=None, + query=False, + verbose=False, + request_bootloader=False, + status=False + ): + self.server = server + self.args = argparse.Namespace( + device=device, + baud=baud, + interface=interface, + firmware=firmware, + uuid=uuid, + query=query, + verbose=verbose, + request_bootloader=request_bootloader, + status=status + ) + + async def run(self) -> int: + iscan = self.args.device is None + sock: CanSocket | SerialSocket | None = None + try: + if iscan: + sock = CanSocket(self.server, self.args) + else: + sock = SerialSocket(self.server, self.args) + await sock.run() + if sock.is_usb_can_bridge and not sock.is_bootloader_req: + self.args.device = str(sock.usb_serial_path) + sock.close() + sock = SerialSocket(self.server, self.args) + await sock.run() + except Exception: + logging.exception("Flash Tool Error") + return 1 + finally: + if sock is not None: + sock.close() + if sock is None: + return 1 + if sock.is_query: + output_line("CANBus UUID Query Complete") + elif sock.is_bootloader_req: + output_line("Bootloader Request Complete") + elif sock.is_status_req: + output_line("Status Request Complete") + else: + output_line("Programming Complete") + return 0 diff --git a/moonraker/components/power.py b/moonraker/components/power.py index 2bf45a9..b90d2b0 100644 --- a/moonraker/components/power.py +++ b/moonraker/components/power.py @@ -39,6 +39,7 @@ if TYPE_CHECKING: from .http_client import HttpClient from .klippy_connection import KlippyConnection from .shell_command import ShellCommandFactory as ShellCommand + from .firmware_manager import FirmwareUpdate class PrinterPower: def __init__(self, config: ConfigHelper) -> None: @@ -411,6 +412,12 @@ class PowerDevice: raise self.server.error( f"Unable to change power for {self.name} " "while printing") + updata: FirmwareUpdate + updata = self.server.lookup_component("firmware_manager") + if updata.is_updating(): + raise self.server.error( + f"Unable to change power for {self.name} " + "while updating") ret = self.set_power(req) if ret is not None: await ret diff --git a/moonraker/components/update_manager/app_deploy.py b/moonraker/components/update_manager/app_deploy.py index 663917d..bb93bbc 100644 --- a/moonraker/components/update_manager/app_deploy.py +++ b/moonraker/components/update_manager/app_deploy.py @@ -258,6 +258,9 @@ class AppDeploy(BaseDeploy): if svc == "klipper": kconn: Klippy = self.server.lookup_component("klippy_connection") svc = kconn.unit_name + firmware_manager = self.server.lookup_component("firmware_manager", None) + if firmware_manager is not None: + firmware_manager.need_check_update = True await machine.do_service_action("restart", svc) async def _read_system_dependencies(self) -> List[str]: diff --git a/moonraker/components/zeroconf.py b/moonraker/components/zeroconf.py index 3844f55..5266a80 100644 --- a/moonraker/components/zeroconf.py +++ b/moonraker/components/zeroconf.py @@ -101,6 +101,7 @@ class ZeroconfRegistrar: else: # Use the UUID. First 8 hex digits should be unique enough instance_name = f"Moonraker-{instance_uuid[:8]}" + instance_name = "CreatBot" hi = self.server.get_host_info() host = self.mdns_name zc_service_props = { @@ -167,9 +168,9 @@ class ZeroconfRegistrar: SSDP_ADDR = ("239.255.255.250", 1900) -SSDP_SERVER_ID = "Moonraker SSDP/UPNP Server" +SSDP_SERVER_ID = "CreatBot SSDP/UPNP Server" SSDP_MAX_AGE = 1800 -SSDP_DEVICE_TYPE = "urn:arksine.github.io:device:Moonraker:1" +SSDP_DEVICE_TYPE = "urn:creatbot-com:device:3dprinter:2" SSDP_DEVICE_XML = """ @@ -180,12 +181,12 @@ SSDP_DEVICE_XML = """ {device_type} {friendly_name} - Arksine - https://github.com/Arksine/moonraker + CreatBot + https://www.creatbot.com API Server for Klipper - Moonraker + CreatBot {model_number} - https://github.com/Arksine/moonraker + https://github.com/CreatBotOfficail {serial_number} uuid:{device_uuid} {presentation_url} @@ -197,7 +198,10 @@ class SSDPServer(asyncio.protocols.DatagramProtocol): def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.unique_id = uuid.UUID(self.server.get_app_args()["instance_uuid"]) - self.name: str = "Moonraker" + self.name: str = "CreatBot" + machine: Machine = self.server.lookup_component("machine") + self.serial_number = machine.get_system_info().get("cpu_info", {}).get("serial_number", "N/A") + self.serial_number = self.serial_number[1:].upper() self.base_url: str = "" self.response_headers: List[str] = [] self.registered: bool = False @@ -280,10 +284,18 @@ class SSDPServer(asyncio.protocols.DatagramProtocol): if len(name) > 64: name = name[:64] self.name = name + model = self.name + device_name = self.name + if "(" in self.name: + model = self.name.split("(", 1)[-1].rsplit("-", 1)[0].rstrip(")") + elif "-" in self.name: + model = self.name.rsplit("-", 1)[0] + if "(" in self.name and ")" in self.name: + device_name = self.name.split("(", 1)[1].split(")", 1)[0] app: MoonrakerApp = self.server.lookup_component("application") - self.base_url = f"http://{host_name_or_ip}:{port}{app.route_prefix}" + self.base_url = f"http://{host_name_or_ip}" self.response_headers = [ - f"USN: uuid:{self.unique_id}::upnp:rootdevice", + f"USN: uuid:{self.serial_number}::upnp:rootdevice::urn:creatbot-com:device:3dprinter:2", f"LOCATION: {self.base_url}/server/zeroconf/ssdp", "ST: upnp:rootdevice", "EXT:", @@ -291,6 +303,8 @@ class SSDPServer(asyncio.protocols.DatagramProtocol): f"CACHE-CONTROL: max-age={SSDP_MAX_AGE}", f"BOOTID.UPNP.ORG: {self.boot_id}", f"CONFIGID.UPNP.ORG: {self.config_id}", + f"DEVICE-MODEL: {model}", + f"DEVICE-NAME: {device_name}", ] self.registered = True advertisements = self._build_notifications("ssdp:alive") @@ -302,14 +316,20 @@ class SSDPServer(asyncio.protocols.DatagramProtocol): async def _handle_xml_request(self, web_request: WebRequest) -> str: if not self.registered: - raise self.server.error("Moonraker SSDP Device not registered", 404) + raise self.server.error("CreatBot SSDP Device not registered", 404) app_args = self.server.get_app_args() + if "(" in self.name: + model = self.name.split("(", 1)[-1].rsplit("-", 1)[0].rstrip(")") + elif "-" in self.name: + model = self.name.rsplit("-", 1)[0] + else: + model = self.name return SSDP_DEVICE_XML.format( device_type=SSDP_DEVICE_TYPE, config_id=str(self.config_id), friendly_name=self.name, - model_number=app_args["software_version"], - serial_number=self.unique_id.hex, + model_number=model, + serial_number=self.serial_number, device_uuid=str(self.unique_id), presentation_url=self.base_url ) @@ -360,7 +380,7 @@ class SSDPServer(asyncio.protocols.DatagramProtocol): ): # Not a discovery request return - if headers.get("ST") not in ["upnp:rootdevice", "ssdp:all"]: + if headers.get("ST") not in ["upnp:rootdevice", "ssdp:all", "urn:creatbot-com:device:3dprinter:2",]: # Service Type doesn't apply return if self.response_handle is not None: @@ -390,7 +410,7 @@ class SSDPServer(asyncio.protocols.DatagramProtocol): notify_types = [ ("upnp:rootdevice", f"uuid:{self.unique_id}::upnp:rootdevice"), (f"uuid:{self.unique_id}", f"uuid:{self.unique_id}"), - (SSDP_DEVICE_TYPE, f"uuid:{self.unique_id}::{SSDP_DEVICE_TYPE}") + (SSDP_DEVICE_TYPE, f"uuid:{self.serial_number}::{SSDP_DEVICE_TYPE}"), ] for (nt, usn) in notify_types: notifications.append(