From 1026d59cad5f72c8f7fee0398497a67981b2e2c9 Mon Sep 17 00:00:00 2001 From: Arksine Date: Sat, 15 May 2021 11:08:52 -0400 Subject: [PATCH] paneldue: add annotations Signed-off-by: Eric Callahan --- moonraker/components/paneldue.py | 263 ++++++++++++++++++------------- 1 file changed, 154 insertions(+), 109 deletions(-) diff --git a/moonraker/components/paneldue.py b/moonraker/components/paneldue.py index 9e99236..462ee8e 100644 --- a/moonraker/components/paneldue.py +++ b/moonraker/components/paneldue.py @@ -3,18 +3,39 @@ # Copyright (C) 2020 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations import serial import os import time import json import errno import logging -import asyncio from collections import deque from utils import ServerError from tornado import gen from tornado.ioloop import IOLoop +# Annotation imports +from typing import ( + TYPE_CHECKING, + Deque, + Any, + Tuple, + Optional, + Dict, + List, + Callable, + Coroutine, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from . import klippy_apis + from . import file_manager + APIComp = klippy_apis.KlippyAPI + FMComp = file_manager.FileManager + FlexCallback = Callable[..., Optional[Coroutine]] + MIN_EST_TIME = 10. INITIALIZE_TIMEOUT = 10. @@ -25,26 +46,31 @@ class PanelDueError(ServerError): RESTART_GCODES = ["RESTART", "FIRMWARE_RESTART"] class SerialConnection: - def __init__(self, config, paneldue): + def __init__(self, + config: ConfigHelper, + paneldue: PanelDue + ) -> None: self.ioloop = IOLoop.current() self.paneldue = paneldue - self.port = config.get('serial') + self.port: str = config.get('serial') self.baud = config.getint('baud', 57600) - self.partial_input = b"" - self.ser = self.fd = None - self.connected = False - self.send_busy = False - self.send_buffer = b"" - self.attempting_connect = True + self.partial_input: bytes = b"" + self.ser: Optional[serial.Serial] = None + self.fd: Optional[int] = None + self.connected: bool = False + self.send_busy: bool = False + self.send_buffer: bytes = b"" + self.attempting_connect: bool = True self.ioloop.spawn_callback(self._connect) - def disconnect(self, reconnect=False): + def disconnect(self, reconnect: bool = False) -> None: if self.connected: if self.fd is not None: self.ioloop.remove_handler(self.fd) self.fd = None self.connected = False - self.ser.close() + if self.ser is not None: + self.ser.close() self.ser = None self.partial_input = b"" self.send_buffer = b"" @@ -52,9 +78,9 @@ class SerialConnection: logging.info("PanelDue Disconnected") if reconnect and not self.attempting_connect: self.attempting_connect = True - self.ioloop.call_later(1., self._connect) + self.ioloop.call_later(1., self._connect) # type: ignore - async def _connect(self): + async def _connect(self) -> None: start_time = connect_time = time.time() while not self.connected: if connect_time > start_time + 30.: @@ -73,14 +99,15 @@ class SerialConnection: connect_time += time.time() continue self.fd = self.ser.fileno() - os.set_blocking(self.fd, False) - self.ioloop.add_handler( - self.fd, self._handle_incoming, IOLoop.READ | IOLoop.ERROR) + fd = self.fd = self.ser.fileno() + os.set_blocking(fd, False) + self.ioloop.add_handler(fd, self._handle_incoming, + IOLoop.READ | IOLoop.ERROR) self.connected = True logging.info("PanelDue Connected") self.attempting_connect = False - def _handle_incoming(self, fd, events): + def _handle_incoming(self, fd: int, events: int) -> None: if events & IOLoop.ERROR: logging.info("PanelDue Connection Error") self.disconnect(reconnect=True) @@ -104,23 +131,24 @@ class SerialConnection: self.partial_input = lines.pop() for line in lines: try: - line = line.strip().decode('utf-8', 'ignore') - self.paneldue.process_line(line) + decoded_line = line.strip().decode('utf-8', 'ignore') + self.paneldue.process_line(decoded_line) except ServerError: logging.exception( - f"GCode Processing Error: {line}") + f"GCode Processing Error: {decoded_line}") self.paneldue.handle_gcode_response( - f"!! GCode Processing Error: {line}") + f"!! GCode Processing Error: {decoded_line}") except Exception: logging.exception("Error during gcode processing") - def send(self, data): + def send(self, data: bytes) -> None: self.send_buffer += data if not self.send_busy: self.send_busy = True self.ioloop.spawn_callback(self._do_send) - async def _do_send(self): + async def _do_send(self) -> None: + assert self.fd is not None while self.send_buffer: if not self.connected: break @@ -142,41 +170,44 @@ class SerialConnection: self.send_busy = False class PanelDue: - def __init__(self, config): + def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() self.ioloop = IOLoop.current() - self.file_manager = self.server.lookup_component('file_manager') - self.klippy_apis = self.server.lookup_component('klippy_apis') - self.kinematics = "none" + self.file_manager: FMComp = \ + self.server.lookup_component('file_manager') + self.klippy_apis: APIComp = \ + self.server.lookup_component('klippy_apis') + self.kinematics: str = "none" self.machine_name = config.get('machine_name', "Klipper") - self.firmware_name = "Repetier | Klipper" - self.last_message = None - self.last_gcode_response = None - self.current_file = "" - self.file_metadata = {} + self.firmware_name: str = "Repetier | Klipper" + self.last_message: Optional[str] = None + self.last_gcode_response: Optional[str] = None + self.current_file: str = "" + self.file_metadata: Dict[str, Any] = {} self.enable_checksum = config.getboolean('enable_checksum', True) - self.debug_queue = deque(maxlen=100) + self.debug_queue: Deque[str] = deque(maxlen=100) # Initialize tracked state. - self.printer_state = { + self.printer_state: Dict[str, Dict[str, Any]] = { 'gcode_move': {}, 'toolhead': {}, 'virtual_sdcard': {}, 'fan': {}, 'display_status': {}, 'print_stats': {}, 'idle_timeout': {}, 'gcode_macro PANELDUE_BEEP': {}} - self.extruder_count = 0 - self.heaters = [] - self.is_ready = False - self.is_shutdown = False - self.initialized = False - self.cq_busy = self.gq_busy = False - self.command_queue = [] - self.gc_queue = [] - self.last_printer_state = 'O' - self.last_update_time = 0. + self.extruder_count: int = 0 + self.heaters: List[str] = [] + self.is_ready: bool = False + self.is_shutdown: bool = False + self.initialized: bool = False + self.cq_busy: bool = False + self.gq_busy: bool = False + self.command_queue: List[Tuple[FlexCallback, Any, Any]] = [] + self.gc_queue: List[str] = [] + self.last_printer_state: str = 'O' + self.last_update_time: float = 0. # Set up macros - self.confirmed_gcode = "" - self.mbox_sequence = 0 - self.available_macros = {} + self.confirmed_gcode: str = "" + self.mbox_sequence: int = 0 + self.available_macros: Dict[str, str] = {} self.confirmed_macros = { "RESTART": "RESTART", "FIRMWARE_RESTART": "FIRMWARE_RESTART"} @@ -184,14 +215,14 @@ class PanelDue: if macros is not None: # The macro's configuration name is the key, whereas the full # command is the value - macros = [m for m in macros.split('\n') if m.strip()] - self.available_macros = {m.split()[0]: m for m in macros} + macro_list = [m for m in macros.split('\n') if m.strip()] + self.available_macros = {m.split()[0]: m for m in macro_list} conf_macros = config.get('confirmed_macros', None) if conf_macros is not None: # The macro's configuration name is the key, whereas the full # command is the value - conf_macros = [m for m in conf_macros.split('\n') if m.strip()] - self.confirmed_macros = {m.split()[0]: m for m in conf_macros} + macro_list = [m for m in conf_macros.split('\n') if m.strip()] + self.confirmed_macros = {m.split()[0]: m for m in macro_list} self.available_macros.update(self.confirmed_macros) ntkeys = config.get('non_trivial_keys', "Klipper state") @@ -216,7 +247,7 @@ class PanelDue: # These commands are directly executued on the server and do not to # make a request to Klippy - self.direct_gcodes = { + self.direct_gcodes: Dict[str, FlexCallback] = { 'M20': self._run_paneldue_M20, 'M30': self._run_paneldue_M30, 'M36': self._run_paneldue_M36, @@ -225,7 +256,7 @@ class PanelDue: # These gcodes require special parsing or handling prior to being # sent via Klippy's "gcode/script" api command. - self.special_gcodes = { + self.special_gcodes: Dict[str, Callable[[List[str]], str]] = { 'M0': lambda args: "CANCEL_PRINT", 'M23': self._prepare_M23, 'M24': lambda args: "RESUME", @@ -239,7 +270,7 @@ class PanelDue: 'M999': lambda args: "FIRMWARE_RESTART" } - async def _process_klippy_ready(self): + async def _process_klippy_ready(self) -> None: # Request "info" and "configfile" status retries = 10 printer_info = cfg_status = {} @@ -259,8 +290,9 @@ class PanelDue: self.firmware_name = "Repetier | Klipper " + \ printer_info['software_version'] - config = cfg_status.get('configfile', {}).get('config', {}) - printer_cfg = config.get('printer', {}) + config: Dict[str, Any] = cfg_status.get( + 'configfile', {}).get('config', {}) + printer_cfg: Dict[str, Any] = config.get('printer', {}) self.kinematics = printer_cfg.get('kinematics', "none") logging.info( @@ -288,6 +320,7 @@ class PanelDue: self.heaters.append(cfg) sub_args[cfg] = None try: + status: Dict[str, Any] status = await self.klippy_apis.subscribe_objects(sub_args) except self.server.error: logging.exception("Unable to complete subscription request") @@ -296,28 +329,28 @@ class PanelDue: self.is_shutdown = False self.is_ready = True - def _process_klippy_shutdown(self): + def _process_klippy_shutdown(self) -> None: self.is_shutdown = True - def _process_klippy_disconnect(self): + def _process_klippy_disconnect(self) -> None: # Tell the PD that the printer is "off" self.write_response({'status': 'O'}) self.last_printer_state = 'O' self.is_shutdown = self.is_shutdown = False - def handle_status_update(self, status): + def handle_status_update(self, status: Dict[str, Any]) -> None: for obj, items in status.items(): if obj in self.printer_state: self.printer_state[obj].update(items) else: self.printer_state[obj] = items - def paneldue_beep(self, frequency, duration): + def paneldue_beep(self, frequency: int, duration: float) -> None: duration = int(duration * 1000.) self.write_response( {'beep_freq': frequency, 'beep_length': duration}) - def process_line(self, line): + def process_line(self, line: str) -> None: self.debug_queue.append(line) # If we find M112 in the line then skip verification if "M112" in line.upper(): @@ -328,7 +361,7 @@ class PanelDue: # Get line number line_index = line.find(' ') try: - line_no = int(line[1:line_index]) + line_no: Optional[int] = int(line[1:line_index]) except Exception: line_index = -1 line_no = None @@ -370,7 +403,7 @@ class PanelDue: # Check for commands that query state and require immediate response if cmd in self.direct_gcodes: - params = {} + params: Dict[str, Any] = {} for p in parts[1:]: if p[0] not in "PSR": params["arg_p"] = p.strip(" \"\t\n") @@ -391,20 +424,20 @@ class PanelDue: # Prepare GCodes that require special handling if cmd in self.special_gcodes: - func = self.special_gcodes[cmd] - script = func(parts[1:]) + sgc_func = self.special_gcodes[cmd] + script = sgc_func(parts[1:]) if not script: return self.queue_gcode(script) - def queue_gcode(self, script): + def queue_gcode(self, script: str) -> None: self.gc_queue.append(script) if not self.gq_busy: self.gq_busy = True self.ioloop.spawn_callback(self._process_gcode_queue) - async def _process_gcode_queue(self): + async def _process_gcode_queue(self) -> None: while self.gc_queue: script = self.gc_queue.pop(0) try: @@ -418,24 +451,24 @@ class PanelDue: logging.exception(msg) self.gq_busy = False - def queue_command(self, cmd, *args, **kwargs): + def queue_command(self, cmd: FlexCallback, *args, **kwargs) -> None: self.command_queue.append((cmd, args, kwargs)) if not self.cq_busy: self.cq_busy = True self.ioloop.spawn_callback(self._process_command_queue) - async def _process_command_queue(self): + async def _process_command_queue(self) -> None: while self.command_queue: cmd, args, kwargs = self.command_queue.pop(0) try: ret = cmd(*args, **kwargs) - if asyncio.iscoroutine(ret): + if ret is not None: await ret except Exception: logging.exception("Error processing command") self.cq_busy = False - def _clean_filename(self, filename): + def _clean_filename(self, filename: str) -> str: # Remove quotes and whitespace filename.strip(" \"\t\n") # Remove drive number @@ -453,17 +486,17 @@ class PanelDue: filename = "/" + filename return filename - def _prepare_M23(self, args): + def _prepare_M23(self, args: List[str]) -> str: filename = self._clean_filename(args[0]) return f"M23 {filename}" - def _prepare_M32(self, args): + def _prepare_M32(self, args: List[str]) -> str: filename = self._clean_filename(args[0]) # Escape existing double quotes in the file name filename = filename.replace("\"", "\\\"") return f"SDCARD_PRINT_FILE FILENAME=\"{filename}\"" - def _prepare_M98(self, args): + def _prepare_M98(self, args: List[str]) -> str: macro = args[0][1:].strip(" \"\t\n") name_start = macro.rfind('/') + 1 macro = macro[name_start:] @@ -475,12 +508,12 @@ class PanelDue: cmd = "" return cmd - def _prepare_M290(self, args): + def _prepare_M290(self, args: List[str]) -> str: # args should in in the format Z0.02 offset = args[0][1:].strip() return f"SET_GCODE_OFFSET Z_ADJUST={offset} MOVE=1" - def _prepare_M292(self, args): + def _prepare_M292(self, args: List[str]) -> str: p_val = int(args[0][1]) if p_val == 0: cmd = self.confirmed_gcode @@ -488,13 +521,13 @@ class PanelDue: return cmd return "" - def _create_confirmation(self, name, gcode): + def _create_confirmation(self, name: str, gcode: str) -> None: self.mbox_sequence += 1 self.confirmed_gcode = gcode title = "Confirmation Dialog" msg = f"Please confirm your intent to run {name}." \ " Press OK to continue, or CANCEL to abort." - mbox = {} + mbox: Dict[str, Any] = {} mbox['msgBox.mode'] = 3 mbox['msgBox.msg'] = msg mbox['msgBox.seq'] = self.mbox_sequence @@ -504,7 +537,7 @@ class PanelDue: logging.debug(f"Creating PanelDue Confirmation: {mbox}") self.write_response(mbox) - def handle_gcode_response(self, response): + def handle_gcode_response(self, response: str) -> None: # Only queue up "non-trivial" gcode responses. At the # moment we'll handle state changes and errors if "Klipper state" in response \ @@ -516,11 +549,11 @@ class PanelDue: self.last_gcode_response = response return - def write_response(self, response): + def write_response(self, response: Dict[str, Any]) -> None: byte_resp = json.dumps(response) + "\r\n" self.ser_conn.send(byte_resp.encode()) - def _get_printer_status(self): + def _get_printer_status(self) -> str: # PanelDue States applicable to Klipper: # I = idle, P = printing from SD, S = stopped (shutdown), # C = starting up (not ready), A = paused, D = pausing, @@ -529,6 +562,7 @@ class PanelDue: return 'S' printer_state = self.printer_state + sd_state: str sd_state = printer_state['print_stats'].get('state', "standby") if sd_state == "printing": if self.last_printer_state == 'A': @@ -548,8 +582,11 @@ class PanelDue: return 'I' - def _run_paneldue_M408(self, arg_r=None, arg_s=1): - response = {} + def _run_paneldue_M408(self, + arg_r: Optional[int] = None, + arg_s: int = 1 + ) -> None: + response: Dict[str, Any] = {} sequence = arg_r response_type = arg_s @@ -587,6 +624,9 @@ class PanelDue: 'homing_origin', [0., 0., 0., 0.])[2], 3) # Current position + pos: List[float] + homed_pos: str + sfactor: float pos = p_state['toolhead'].get('position', [0., 0., 0., 0.]) response['pos'] = [round(p, 2) for p in pos[:3]] homed_pos = p_state['toolhead'].get('homed_axes', "") @@ -597,38 +637,41 @@ class PanelDue: # Print Progress Tracking sd_status = p_state['virtual_sdcard'] print_stats = p_state['print_stats'] - fname = print_stats.get('filename', "") - sd_print_state = print_stats.get('state') + fname: str = print_stats.get('filename', "") + sd_print_state: Optional[str] = print_stats.get('state') if sd_print_state in ['printing', 'paused']: # We know a file has been loaded, initialize metadata if self.current_file != fname: self.current_file = fname self.file_metadata = self.file_manager.get_file_metadata(fname) - progress = sd_status.get('progress', 0) + progress: float = sd_status.get('progress', 0) # progress and print tracking if progress: response['fraction_printed'] = round(progress, 3) - est_time = self.file_metadata.get('estimated_time', 0) + est_time: float = self.file_metadata.get('estimated_time', 0) if est_time > MIN_EST_TIME: # file read estimate times_left = [int(est_time - est_time * progress)] # filament estimate + est_total_fil: Optional[float] est_total_fil = self.file_metadata.get('filament_total') if est_total_fil: - cur_filament = print_stats.get('filament_used', 0.) + cur_filament: float = print_stats.get( + 'filament_used', 0.) fpct = min(1., cur_filament / est_total_fil) times_left.append(int(est_time - est_time * fpct)) # object height estimate + obj_height: Optional[float] obj_height = self.file_metadata.get('object_height') if obj_height: - cur_height = p_state['gcode_move'].get( + cur_height: float = p_state['gcode_move'].get( 'gcode_position', [0., 0., 0., 0.])[2] hpct = min(1., cur_height / obj_height) times_left.append(int(est_time - est_time * hpct)) else: # The estimated time is not in the metadata, however we # can still provide an estimate based on file progress - duration = print_stats.get('print_duration', 0.) + duration: float = print_stats.get('print_duration', 0.) times_left = [int(duration / progress - duration)] response['timesLeft'] = times_left else: @@ -636,11 +679,12 @@ class PanelDue: self.current_file = "" self.file_metadata = {} - fan_speed = p_state['fan'].get('speed') + fan_speed: Optional[float] = p_state['fan'].get('speed') if fan_speed is not None: response['fanPercent'] = [round(fan_speed * 100, 1)] if self.extruder_count > 0: + extruder_name: Optional[str] extruder_name = p_state['toolhead'].get('extruder') if extruder_name is not None: tool = 0 @@ -649,12 +693,12 @@ class PanelDue: response['tool'] = tool # Report Heater Status - efactor = round(p_state['gcode_move'].get( + efactor: float = round(p_state['gcode_move'].get( 'extrude_factor', 1.) * 100., 2) for name in self.heaters: - temp = round(p_state[name].get('temperature', 0.0), 1) - target = round(p_state[name].get('target', 0.0), 1) + temp: float = round(p_state[name].get('temperature', 0.0), 1) + target: float = round(p_state[name].get('target', 0.0), 1) response.setdefault('heaters', []).append(temp) response.setdefault('active', []).append(target) response.setdefault('standby', []).append(target) @@ -664,7 +708,7 @@ class PanelDue: response.setdefault('extr', []).append(round(pos[3], 2)) # Display message (via M117) - msg = p_state['display_status'].get('message') + msg: str = p_state['display_status'].get('message', "") if msg and msg != self.last_message: response['message'] = msg # reset the message so it only shows once. The paneldue @@ -673,7 +717,7 @@ class PanelDue: self.last_message = msg self.write_response(response) - def _run_paneldue_M20(self, arg_p, arg_s=0): + def _run_paneldue_M20(self, arg_p: str, arg_s: int = 0) -> None: response_type = arg_s if response_type != 2: logging.info( @@ -689,7 +733,7 @@ class PanelDue: # ie. "0:/" if path.startswith("0:/"): path = path[2:] - response = {'dir': path} + response: Dict[str, Any] = {'dir': path} response['files'] = [] if path == "/macros": @@ -713,7 +757,7 @@ class PanelDue: response['files'] = flist self.write_response(response) - async def _run_paneldue_M30(self, arg_p=None): + async def _run_paneldue_M30(self, arg_p: str = "") -> None: # Delete a file. Clean up the file name and make sure # it is relative to the "gcodes" root. path = arg_p @@ -727,9 +771,9 @@ class PanelDue: path = "gcodes/" + path await self.file_manager.delete_file(path) - def _run_paneldue_M36(self, arg_p=None): - response = {} - filename = arg_p + def _run_paneldue_M36(self, arg_p: Optional[str] = None) -> None: + response: Dict[str, Any] = {} + filename: Optional[str] = arg_p sd_status = self.printer_state.get('virtual_sdcard', {}) print_stats = self.printer_state.get('print_stats', {}) if filename is None: @@ -756,37 +800,38 @@ class PanelDue: if not filename.startswith("gcodes/"): filename = "gcodes/" + filename - metadata = self.file_manager.get_file_metadata(filename) + metadata: Dict[str, Any] = \ + self.file_manager.get_file_metadata(filename) if metadata: response['err'] = 0 response['size'] = metadata['size'] # workaround for PanelDue replacing the first "T" found response['lastModified'] = "T" + time.ctime(metadata['modified']) - slicer = metadata.get('slicer') + slicer: Optional[str] = metadata.get('slicer') if slicer is not None: response['generatedBy'] = slicer - height = metadata.get('object_height') + height: Optional[float] = metadata.get('object_height') if height is not None: response['height'] = round(height, 2) - layer_height = metadata.get('layer_height') + layer_height: Optional[float] = metadata.get('layer_height') if layer_height is not None: response['layerHeight'] = round(layer_height, 2) - filament = metadata.get('filament_total') + filament: Optional[float] = metadata.get('filament_total') if filament is not None: response['filament'] = [round(filament, 1)] - est_time = metadata.get('estimated_time') + est_time: Optional[float] = metadata.get('estimated_time') if est_time is not None: response['printTime'] = int(est_time + .5) else: response['err'] = 1 self.write_response(response) - def close(self): + def close(self) -> None: self.ser_conn.disconnect() msg = "\nPanelDue GCode Dump:" for i, gc in enumerate(self.debug_queue): msg += f"\nSequence {i}: {gc}" logging.debug(msg) -def load_component(config): +def load_component(config: ConfigHelper) -> PanelDue: return PanelDue(config)