#!/usr/bin/python import argparse import gi import gettext import time import threading import json import requests import websocket import importlib import logging import os import re import subprocess gi.require_version("Gtk", "3.0") from gi.repository import Gtk, Gdk, GLib, Pango from jinja2 import Environment, Template from ks_includes.KlippyWebsocket import KlippyWebsocket from ks_includes.KlippyRest import KlippyRest from ks_includes.files import KlippyFiles from ks_includes.KlippyGtk import KlippyGtk from ks_includes.printer import Printer from ks_includes.config import KlipperScreenConfig # Create logging logger = logging.getLogger('KlipperScreen') logger.setLevel(logging.DEBUG) formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s') fh = logging.FileHandler('/tmp/KlipperScreen.log') fh.setLevel(logging.DEBUG) fh.setFormatter(formatter) ch = logging.StreamHandler() ch.setLevel(logging.ERROR) ch.setFormatter(formatter) logger.addHandler(fh) logger.addHandler(ch) klipperscreendir = os.getcwd() config = klipperscreendir + "/KlipperScreen.config" logger.info("Config file: " + config) class KlipperScreen(Gtk.Window): """ Class for creating a screen for Klipper via HDMI """ _cur_panels = [] bed_temp_label = None currentPanel = None files = None filename = "" last_update = {} load_panel = {} number_tools = 1 panels = {} popup_message = None printer = None rtl_languages = ['he_il'] subscriptions = [] shutdown = True def __init__(self): self.version = get_software_version() logger.info("KlipperScreen version: %s" % self.version) parser = argparse.ArgumentParser(description="KlipperScreen - A GUI for Klipper") parser.add_argument( "-c","--configfile", default="~/KlipperScreen.conf", metavar='', help="Location of KlipperScreen configuration file" ) args = parser.parse_args() configfile = os.path.normpath(os.path.expanduser(args.configfile)) self.lang = gettext.translation('KlipperScreen', localedir='ks_includes/locales', fallback=True) self._config = KlipperScreenConfig(configfile, self.lang) self.printer = Printer({ "software_version": "Unknown" }, { 'configfile': { 'config': {} }, 'print_stats': { 'state': 'disconnected' }, 'virtual_sdcard': { 'is_active': False } }) self.printer.set_callbacks({ "disconnected": self.state_disconnected, "error": self.state_error, "printing": self.state_printing, "ready": self.state_ready, "startup": self.state_startup, "shutdown": self.state_shutdown }) logger.debug("OS Language: %s" % os.getenv('LANG')) self.lang_ltr = True for lang in self.rtl_languages: if os.getenv('LANG').lower().startswith(lang): self.lang_ltr = False logger.debug("Enabling RTL mode") break _ = self.lang.gettext self.apiclient = KlippyRest(self._config.get_main_config_option("moonraker_host"), self._config.get_main_config_option("moonraker_port"), self._config.get_main_config_option("moonraker_api_key", False)) powerdevs = self.apiclient.send_request("machine/device_power/devices") if powerdevs != False: self.printer.configure_power_devices(powerdevs['result']) Gtk.Window.__init__(self) self.width = self._config.get_main_config().getint("width", Gdk.Screen.get_width(Gdk.Screen.get_default())) self.height = self._config.get_main_config().getint("height", Gdk.Screen.get_height(Gdk.Screen.get_default())) self.set_default_size(self.width, self.height) self.set_resizable(False) logger.info("Screen resolution: %sx%s" % (self.width, self.height)) self.gtk = KlippyGtk(self.width, self.height) self.init_style() #self._load_panels() self.printer_initializing(_("Initializing")) self._ws = KlippyWebsocket(self, { "on_connect": self.init_printer, "on_message": self._websocket_callback, "on_close": self.printer_initializing }, self._config.get_main_config_option("moonraker_host"), self._config.get_main_config_option("moonraker_port") ) self._ws.initial_connect() # Disable DPMS os.system("/usr/bin/xset -display :0 s off") os.system("/usr/bin/xset -display :0 -dpms") os.system("/usr/bin/xset -display :0 s noblank") return def ws_subscribe(self): requested_updates = { "objects": { "bed_mesh": ["profile_name","mesh_max","mesh_min","probed_matrix"], "configfile": ["config"], "fan": ["speed"], "gcode_move": ["extrude_factor","gcode_position","homing_origin","speed_factor"], "heater_bed": ["target","temperature"], "idle_timeout": ["state"], "pause_resume": ["is_paused"], "print_stats": ["print_duration","total_duration","filament_used","filename","state","message"], "toolhead": ["homed_axes","estimated_print_time","print_time","position","extruder"], "virtual_sdcard": ["file_position","is_active","progress"], "webhooks": ["state","state_message"] } } for extruder in self.printer.get_tools(): requested_updates['objects'][extruder] = ["target","temperature","pressure_advance","smooth_time"] self._ws.klippy.object_subscription(requested_updates) def _load_panel(self, panel, *args): if not panel in self.load_panel: logger.debug("Loading panel: %s" % panel) panel_path = os.path.join(os.path.dirname(__file__), 'panels', "%s.py" % panel) logger.info("Panel path: %s" % panel_path) if not os.path.exists(panel_path): msg = f"Panel {panel} does not exist" logger.info(msg) raise Exception(msg) module = importlib.import_module("panels.%s" % panel) if not hasattr(module, "create_panel"): msg = f"Cannot locate create_panel function for {panel}" logger.info(msg) raise Exception(msg) self.load_panel[panel] = getattr(module, "create_panel") try: return self.load_panel[panel](*args) except Exception: msg = f"Unable to create panel {panel}" logger.exception(msg) raise Exception(msg) def show_panel(self, panel_name, type, title, remove=None, pop=True, **kwargs): if panel_name not in self.panels: self.panels[panel_name] = self._load_panel(type, self, title) try: if kwargs != {}: self.panels[panel_name].initialize(panel_name, **kwargs) else: self.panels[panel_name].initialize(panel_name) except: del self.panels[panel_name] self.show_error_modal("Unable to load panel %s" % type) return if hasattr(self.panels[panel_name],"process_update"): self.panels[panel_name].process_update("notify_status_update", self.printer.get_data()) if hasattr(self.panels[panel_name],"activate"): self.panels[panel_name].activate() if remove == 2: self._remove_all_panels() elif remove == 1: self._remove_current_panel(pop) self.add(self.panels[panel_name].get()) self.show_all() self._cur_panels.append(panel_name) logger.debug("Current panel hierarchy: %s", str(self._cur_panels)) def show_popup_message(self, message): if self.popup_message != None: self.close_popup_message() box = Gtk.Box() box.get_style_context().add_class("message_popup") box.set_size_request(self.width, self.gtk.get_header_size()) label = Gtk.Label() if "must home axis first" in message.lower(): message = "Must home all axis first." label.set_text(message) close = Gtk.Button.new_with_label("X") close.set_can_focus(False) close.props.relief = Gtk.ReliefStyle.NONE close.connect("clicked", self.close_popup_message) box.pack_start(label, True, True, 0) box.pack_end(close, False, False, 0) box.set_halign(Gtk.Align.CENTER) cur_panel = self.panels[self._cur_panels[-1]] for i in ['back','estop','home']: if i in cur_panel.control: cur_panel.control[i].set_sensitive(False) cur_panel.get().put(box, 0,0) self.show_all() self.popup_message = box GLib.timeout_add(10000, self.close_popup_message) return False def close_popup_message(self, widget=None): if self.popup_message == None: return cur_panel = self.panels[self._cur_panels[-1]] for i in ['back','estop','home']: if i in cur_panel.control: cur_panel.control[i].set_sensitive(True) cur_panel.get().remove(self.popup_message) self.popup_message = None self.show_all() def show_error_modal(self, err): _ = self.lang.gettext logger.exception("Showing error modal: %s", err) buttons = [ {"name":_("Go Back"),"response": Gtk.ResponseType.CANCEL} ] label = Gtk.Label() label.set_markup(("%s \n\n" % err) + _("Check /tmp/KlipperScreen.log for more information.\nPlease submit an issue on GitHub for help.")) label.set_hexpand(True) label.set_halign(Gtk.Align.CENTER) label.set_line_wrap(True) label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR) dialog = self.gtk.Dialog(self, buttons, label, self.error_modal_response) def error_modal_response(self, widget, response_id): widget.destroy() def init_style(self): style_provider = Gtk.CssProvider() #style_provider.load_from_path(klipperscreendir + "/style.css") css = open(klipperscreendir + "/styles/style.css") css_data = css.read() css.close() css_data = css_data.replace("KS_FONT_SIZE",str(self.gtk.get_font_size())) style_provider = Gtk.CssProvider() style_provider.load_from_data(css_data.encode()) Gtk.StyleContext.add_provider_for_screen( Gdk.Screen.get_default(), style_provider, Gtk.STYLE_PROVIDER_PRIORITY_APPLICATION ) def is_printing(self): return "job_status" in self._cur_panels def _go_to_submenu(self, widget, name): logger.info("#### Go to submenu " + str(name)) #self._remove_current_panel(False) # Find current menu item panels = list(self._cur_panels) if "job_status" not in self._cur_panels: menu = "__main" else: menu = "__print" logger.info("#### Menu " + str(menu)) disname = self._config.get_menu_name(menu, name) menuitems = self._config.get_menu_items(menu, name) if len(menuitems) == 0: logger.info("No items in menu, returning.") return self.show_panel(self._cur_panels[-1] + '_' + name, "menu", disname, 1, False, display_name=disname, items=menuitems) def _remove_all_panels(self): while len(self._cur_panels) > 0: self._remove_current_panel(True, False) self.show_all() def _remove_current_panel(self, pop=True, show=True): if len(self._cur_panels) > 0: self.remove(self.panels[self._cur_panels[-1]].get()) if pop == True: self._cur_panels.pop() if len(self._cur_panels) > 0: self.add(self.panels[self._cur_panels[-1]].get()) if show == True: self.show_all() def _menu_go_back (self, widget=None): logger.info("#### Menu go back") self._remove_current_panel() def _menu_go_home(self): logger.info("#### Menu go home") while len(self._cur_panels) > 1: self._remove_current_panel() def add_subscription (self, panel_name): add = True for sub in self.subscriptions: if sub == panel_name: return self.subscriptions.append(panel_name) def remove_subscription (self, panel_name): for i in range(len(self.subscriptions)): if self.subscriptions[i] == panel_name: self.subscriptions.pop(i) return def state_disconnected(self): _ = self.lang.gettext logger.debug("### Going to disconnected") self.printer_initializing(_("Klipper has disconnected")) def state_error(self): _ = self.lang.gettext msg = self.printer.get_stat("webhooks","state_message") if "FIRMWARE_RESTART" in msg: self.printer_initializing( _("Klipper has encountered an error.\nIssue a FIRMWARE_RESTART to attempt fixing the issue.") ) elif "micro-controller" in msg: self.printer_initializing( _("Klipper has encountered an error with the micro-controller.\nPlease recompile and flash.") ) else: self.printer_initializing( _("Klipper has encountered an error.") ) def state_printing(self): self.printer_printing() def state_ready(self): # Do not return to main menu if completing a job, timeouts/user input will return if "job_status" in self._cur_panels or "main_menu" in self._cur_panels: return self.printer_ready() def state_startup(self): _ = self.lang.gettext self.printer_initializing(_("Klipper is attempting to start")) def state_shutdown(self): _ = self.lang.gettext self.printer_initializing(_("Klipper has shutdown")) def _websocket_callback(self, action, data): _ = self.lang.gettext if action == "notify_klippy_disconnected": self.printer.change_state("disconnected") return elif action == "notify_klippy_ready": self.printer.change_state("ready") elif action == "notify_status_update" and self.printer.get_state() != "shutdown": self.printer.process_update(data) elif action == "notify_filelist_changed": logger.debug("Filelist changed: %s", json.dumps(data,indent=2)) #self.files.add_file() elif action == "notify_metadata_update": self.files.request_metadata(data['filename']) elif action == "notify_power_changed": logger.debug("Power status changed: %s", data) self.printer.process_power_update(data) elif self.printer.get_state() not in ["error","shutdown"] and action == "notify_gcode_response": if "Klipper state: Shutdown" in data: self.printer.change_state("shutdown") if not (data.startswith("B:") and re.search(r'B:[0-9\.]+\s/[0-9\.]+\sT[0-9]+:[0-9\.]+', data)): if data.startswith("!! "): self.show_popup_message(data[3:]) logger.debug(json.dumps([action, data], indent=2)) for sub in self.subscriptions: self.panels[sub].process_update(action, data) def _confirm_send_action(self, widget, text, method, params): _ = self.lang.gettext buttons = [ {"name":_("Continue"), "response": Gtk.ResponseType.OK}, {"name":_("Cancel"),"response": Gtk.ResponseType.CANCEL} ] try: env = Environment(extensions=["jinja2.ext.i18n"]) env.install_gettext_translations(self.lang) j2_temp = env.from_string(text) text = j2_temp.render() except: logger.debug("Error parsing jinja for confirm_send_action") label = Gtk.Label() label.set_markup(text) label.set_hexpand(True) label.set_halign(Gtk.Align.CENTER) label.set_line_wrap(True) label.set_line_wrap_mode(Pango.WrapMode.WORD_CHAR) dialog = self.gtk.Dialog(self, buttons, label, self._confirm_send_action_response, method, params) def _confirm_send_action_response(self, widget, response_id, method, params): if response_id == Gtk.ResponseType.OK: self._send_action(widget, method, params) widget.destroy() def _send_action(self, widget, method, params): self._ws.send_method(method, params) def printer_initializing(self, text=None): self.shutdown = True self.close_popup_message() self.show_panel('splash_screen',"splash_screen", "Splash Screen", 2) if text != None: self.panels['splash_screen'].update_text(text) self.panels['splash_screen'].show_restart_buttons() def init_printer(self): _ = self.lang.gettext status_objects = [ 'bed_mesh', 'idle_timeout', 'configfile', 'gcode_move', 'fan', 'toolhead', 'virtual_sdcard', 'print_stats', 'heater_bed', 'extruder', 'pause_resume', 'webhooks' ] printer_info = self.apiclient.get_printer_info() logger.debug("Sendingn request %s" % "printer/objects/query?" + "&".join(status_objects)) data = self.apiclient.send_request("printer/objects/query?" + "&".join(status_objects)) powerdevs = self.apiclient.send_request("machine/device_power/devices") data = data['result']['status'] # Reinitialize printer, in case the printer was shut down and anything has changed. self.printer.reinit(printer_info['result'], data) self.ws_subscribe() if powerdevs != False: self.printer.configure_power_devices(powerdevs['result']) if self.files == None: self.files = KlippyFiles(self) else: self.files.add_timeout() def printer_ready(self): self.close_popup_message() self.show_panel('main_panel', "main_menu", "Main Menu", 2, items=self._config.get_menu_items("__main"), extrudercount=self.printer.get_extruder_count()) self.ws_subscribe() if "job_status" in self.panels: self.remove_subscription("job_status") del self.panels["job_status"] def printer_printing(self): self.files.remove_timeout() self.close_popup_message() self.show_panel('job_status',"job_status", "Print Status", 2) def get_software_version(): prog = ('git', '-C', os.path.dirname(__file__), 'describe', '--always', '--tags', '--long', '--dirty') try: process = subprocess.Popen(prog, stdout=subprocess.PIPE, stderr=subprocess.PIPE) ver, err = process.communicate() retcode = process.wait() if retcode == 0: version = ver.strip() if isinstance(version, bytes): version = version.decode() return version else: logger.debug(f"Error getting git version: {err}") except OSError: logger.exception("Error runing git describe") return "?" def main(): win = KlipperScreen() win.connect("destroy", Gtk.main_quit) win.show_all() Gtk.main() if __name__ == "__main__": try: main() except: logger.exception("Fatal error in main loop")