CreatBotMoonraker/moonraker/components/firmware_manager.py
zkk 08648da7a4 实现下位机版本检测升级功能以及优化断电续打
# Conflicts:
#	config/moonraker.conf   resolved by f5461563b8a7a5a25470a8763b8513779dbff253 version
#	scripts/sync_dependencies.py   resolved by f5461563b8a7a5a25470a8763b8513779dbff253 version
2025-05-10 16:36:55 +08:00

207 lines
8.4 KiB
Python

import os
import logging
import asyncio
from .flash_tool import FlashTool
from ..confighelper import ConfigHelper
from ..common import KlippyState
from .klippy_apis import KlippyAPI
from typing import (
TYPE_CHECKING,
Any,
Dict,
)
if TYPE_CHECKING:
from .machine import Machine
from .klippy_connection import KlippyConnection as Klippy
HOME = os.path.expanduser("~")
KLIPPER_DIR = os.path.join(HOME, "klipper/firmware")
MAIN_DEV = "/dev/ttyACM0"
SYSTEM_PYTHON = "python3"
class FirmwareUpdate:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
self.mcu_info: Dict[str, Dict[str, Any]] = {}
self._service_info: Dict[str, Any] = {}
self.min_version: str = ""
self.klipper_version: str = ""
self.current_progress = 0
self.updating = False
self.need_check_update = True
self.server.register_event_handler(
"server:klippy_started", self._on_klippy_startup)
@property
def klippy_apis(self) -> KlippyAPI:
return self.server.lookup_component("klippy_apis")
def is_updating(self) -> bool:
return self.updating
async def _on_klippy_startup(self, state: KlippyState) -> None:
if not self.need_check_update or not self._is_firmware_dir_exists():
return
try:
await self.build_mcu_info()
logging.info(f"{self.mcu_info}")
except Exception as e:
logging.exception(f"An error occurred during the building mcu info process: {e}")
for mcu_data in self.mcu_info.values():
if mcu_data.get('need_update', False):
await self.start_update()
break
async def _do_klipper_action(self, action: str) -> None:
try:
machine: Machine = self.server.lookup_component("machine")
await machine.do_service_action(action, "klipper")
logging.info(f"Klipper service {action}.")
except self.server.error:
pass
def _is_firmware_dir_exists(self):
return os.path.exists(KLIPPER_DIR)
async def start_update(self):
try:
self.updating = True
await self._do_klipper_action("stop")
await self.upgrade_needed_tool_mcus()
await self.upgrade_mcu()
await self._do_klipper_action("start")
self.updating = False
except Exception as e:
logging.exception(f"An error occurred during the update process: {e}")
self.updating = False
async def build_mcu_info(self) -> None:
printer_info: Dict[str, Any] = {}
cfg_status: Dict[str, Any] = {}
try:
printer_info = await self.klippy_apis.get_klippy_info()
cfg_status = await self.klippy_apis.query_objects({'configfile': None})
except self.server.error:
logging.exception("PanelDue initialization request failed")
config = cfg_status.get('configfile', {}).get('config', {})
self.klipper_version = printer_info.get("software_version", "").split('-')[0]
try:
self._build_basic_mcu_info(config)
await self._update_mcu_versions()
self._check_mcu_update_needed()
except Exception as e:
logging.exception(f"An error occurred while building MCU info: {e}")
def _build_basic_mcu_info(self, config: Dict[str, Any]) -> None:
for mcu, value in config.items():
if mcu.startswith("mcu") and "canbus_uuid" in value:
self.mcu_info[mcu] = {"canbus_uuid": value["canbus_uuid"]}
async def _update_mcu_versions(self) -> None:
for mcu in self.mcu_info:
try:
response = await self.klippy_apis.query_objects({mcu: None})
mcu_data = response.get(mcu, {})
mcu_version: str = mcu_data.get('mcu_version', '')
if mcu == "mcu":
self.min_version = mcu_data.get('min_firmware_version', "")
if mcu_version:
self.need_check_update = False
short_version: str = mcu_version.split('-')[0]
if short_version.lower().startswith('v'):
short_version = short_version[1:]
self.mcu_info[mcu]['mcu_version'] = short_version
except Exception as e:
logging.error(f"Error querying {mcu}: {e}")
def _compare_versions(self, version1, version2):
v1_parts = [int(part) for part in version1.split('.') if part]
v2_parts = [int(part) for part in version2.split('.') if part]
max_length = max(len(v1_parts), len(v2_parts))
v1_parts.extend([0] * (max_length - len(v1_parts)))
v2_parts.extend([0] * (max_length - len(v2_parts)))
for i in range(max_length):
if v1_parts[i] < v2_parts[i]:
return -1
elif v1_parts[i] > v2_parts[i]:
return 1
return 0
def _check_mcu_update_needed(self) -> None:
if self.klipper_version.lower().startswith('v'):
self.klipper_version = self.klipper_version[1:]
logging.info(f"min version: {self.min_version}")
logging.info(f"klipper version: {self.klipper_version}")
for mcu in self.mcu_info:
mcu_version = self.mcu_info[mcu].get('mcu_version', "")
mcu_vs_min = self._compare_versions(mcu_version, self.min_version)
mcu_vs_klipper = self._compare_versions(mcu_version, self.klipper_version)
if mcu_vs_min < 0 or mcu_vs_klipper > 0:
self.mcu_info[mcu]['need_update'] = True
else:
self.mcu_info[mcu]['need_update'] = False
async def upgrade_needed_tool_mcus(self):
firmware_mapping = {
"mcu L_tool": os.path.join(KLIPPER_DIR, "F072_L.bin"),
"mcu R_tool": os.path.join(KLIPPER_DIR, "F072_R.bin"),
"mcu tool": os.path.join(KLIPPER_DIR, "F072_L.bin")
}
for mcu_name, mcu_data in self.mcu_info.items():
if mcu_data.get('need_update', False) and mcu_name != "mcu":
firmware_path = firmware_mapping.get(mcu_name)
if not firmware_path:
logging.warning(f"No firmware specified for {mcu_name}, skipping upgrade.")
continue
try:
flash_tool = FlashTool(
self.server,
uuid = mcu_data['canbus_uuid'],
firmware = firmware_path
)
exit_code = await flash_tool.run()
await asyncio.sleep(1)
if exit_code == 0:
logging.info(f"Upgrade operation for {mcu_name} succeeded, exit code: {exit_code}")
else:
logging.error(f"Upgrade operation for {mcu_name} failed, exit code: {exit_code}")
except Exception as e:
logging.exception(f"An exception occurred during the upgrade process for {mcu_name}: {e}")
async def upgrade_mcu(self):
for mcu_name, mcu_data in self.mcu_info.items():
if mcu_name == "mcu" and mcu_data.get('need_update', False):
try:
flash_tool = FlashTool(
self.server,
uuid=mcu_data['canbus_uuid'],
request_bootloader=True
)
exit_code = await flash_tool.run()
if exit_code != 0:
logging.error(f"Failed to request bootloader for {mcu_name}, exit code: {exit_code}")
continue
await asyncio.sleep(2)
flash_tool = FlashTool(
self.server,
device=MAIN_DEV,
firmware=os.path.join(KLIPPER_DIR, "F446.bin")
)
exit_code = await flash_tool.run()
if exit_code == 0:
logging.info(f"Upgrade operation for {mcu_name} succeeded, exit code: {exit_code}")
else:
logging.error(f"Upgrade operation for {mcu_name} failed, exit code: {exit_code}")
except Exception as e:
logging.exception(f"An exception occurred during the upgrade process for {mcu_name}: {e}")
def load_component(config: ConfigHelper) -> FirmwareUpdate:
return FirmwareUpdate(config)