实现恢复出厂设置功能

This commit is contained in:
张开科 2024-12-23 16:25:37 +08:00
parent e3a38f8561
commit 0c2a2618be
6 changed files with 353 additions and 0 deletions

View File

@ -0,0 +1,182 @@
import logging
import os
import subprocess
from ks_includes.KlippyGcodes import KlippyGcodes
class KlippyFactory:
@staticmethod
def clean_mainsail_web_config(connect):
connect.get_database_list(KlippyFactory.clean_database_callback, connect, "mainsail")
logging.info("clean mainsail web config")
@staticmethod
def clean_maintenance(connect):
connect.get_database_list(KlippyFactory.clean_database_callback, connect, "maintenance")
logging.info("clean maintenance config")
@staticmethod
def clean_gcode_metadata(connect):
connect.get_database_list(KlippyFactory.clean_database_callback, connect, "gcode_metadata")
logging.info("clean gcode metadata")
@staticmethod
def clean_update_manager(connect):
connect.get_database_list(KlippyFactory.clean_database_callback, connect, "update_manager")
logging.info("clean update manager")
@staticmethod
def clean_database_callback(result, method, params, connect, option):
if "server.database.list" == method:
connect.get_database_item(option, None, KlippyFactory.clean_database_callback, connect, option)
elif "server.database.get_item" == method:
if "result" in result and option == result["result"]["namespace"]:
for key in result["result"]["value"]:
connect.del_database_item(option, key, None)
@staticmethod
def clean_job_history(connect):
connect.reset_job_history_totals()
connect.del_all_job_history()
logging.info("clean_job_history")
@staticmethod
def reset_advanced_setting_factory(connect):
option_list = {
"adaptive_meshing": False,
"power_loss_recovery": True,
"auto_change_nozzle": False,
}
for key, val in option_list.items():
script = KlippyGcodes.set_save_variables(key, val)
connect.gcode_script(script)
logging.info("reset advanced setting")
@staticmethod
def clean_screen_config(config, guide=False):
config.del_all(guide)
logging.info("clean screen config")
@staticmethod
def hostname_factory():
logging.info("clean screen config")
@staticmethod
def clean_wlan():
command = "nmcli connection show | grep wifi | awk '{print $1}' | xargs -I {} nmcli connection delete {}"
subprocess.run(command, shell=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL)
logging.info("clean wlan config")
@staticmethod
def clean_gocde_file():
exclude_dir = [
".PresetModel",
]
command = "lsblk | grep '/home/klipper/printer_data/gcodes'"
res = subprocess.run(command, shell=True, text=True, capture_output=True)
if res.returncode == 0:
output = res.stdout.strip()
if output:
lines = output.split("\n")
for line in lines:
result = line.strip().split("/")[-1]
exclude_dir.append(f"{result}")
homedir = os.path.expanduser("~")
configdir = os.path.join(homedir, "printer_data", "gcodes")
KlippyFactory.clear_directory(configdir, exclude_dirs=exclude_dir)
logging.info("clean gocde file")
@staticmethod
def clean_log_file():
homedir = os.path.expanduser("~")
logdir = os.path.join(homedir, "printer_data", "logs")
KlippyFactory.clear_directory(logdir)
logging.info("clean log directory")
@staticmethod
def clean_config_backup_file():
exclude_files = [
"base.cfg",
"printer.cfg",
"config_variables.cfg",
"crowsnest.conf",
"KlipperScreen.conf",
"mainsail.cfg",
"moonraker.conf",
]
homedir = os.path.expanduser("~")
configdir = os.path.join(homedir, "printer_data", "config")
KlippyFactory.clear_directory(configdir, exclude_files)
logging.info("clean config backup file")
@staticmethod
def clear_directory(directory, exclude_files=None, exclude_dirs=None):
exclude_files = exclude_files or []
exclude_dirs = exclude_dirs or []
if os.path.exists(directory):
for filename in os.listdir(directory):
file_path = os.path.join(directory, filename)
if filename in exclude_files:
continue
if filename in exclude_dirs and os.path.isdir(file_path):
continue
try:
if os.path.isfile(file_path):
os.remove(file_path)
elif os.path.isdir(file_path):
for sub_filename in os.listdir(file_path):
sub_file_path = os.path.join(file_path, sub_filename)
if os.path.isfile(sub_file_path):
os.remove(sub_file_path)
elif os.path.isdir(sub_file_path):
os.rmdir(sub_file_path)
os.rmdir(file_path)
except Exception as e:
logging.error(f"Error deleting {file_path}: {e}")
else:
logging.info(f"The directory {directory} does not exist.")
@staticmethod
def restart_application():
os.system("systemctl restart klipper.service")
os.system("systemctl restart moonraker.service")
os.system("systemctl restart KlipperScreen.service")
@staticmethod
def user_factory_reset(connect, config, clean_gcode=False):
KlippyFactory.clean_screen_config(config)
KlippyFactory.clean_mainsail_web_config(connect)
KlippyFactory.clean_maintenance(connect)
KlippyFactory.clean_gcode_metadata(connect)
KlippyFactory.clean_update_manager(connect)
KlippyFactory.clean_job_history(connect)
KlippyFactory.reset_advanced_setting_factory(connect)
KlippyFactory.clean_wlan()
KlippyFactory.clean_log_file()
if clean_gcode:
KlippyFactory.clean_gocde_file()
logging.info("User factory reset completed")
KlippyFactory.restart_application()
@staticmethod
def production_factory_reset(connect, config):
KlippyFactory.clean_screen_config(config, True)
KlippyFactory.clean_mainsail_web_config(connect)
KlippyFactory.clean_maintenance(connect)
KlippyFactory.clean_gcode_metadata(connect)
KlippyFactory.clean_update_manager(connect)
KlippyFactory.clean_job_history(connect)
KlippyFactory.reset_advanced_setting_factory(connect)
KlippyFactory.clean_wlan()
KlippyFactory.clean_gocde_file()
KlippyFactory.clean_log_file()
KlippyFactory.clean_config_backup_file()
logging.info("Production factory reset completed")
KlippyFactory.restart_application()

View File

@ -212,6 +212,58 @@ class MoonrakerApi:
*args
)
def get_database_list(self, callback=None, *args):
return self._ws.send_method(
"server.database.list",
{},
callback,
*args
)
def add_database_item(self, namespace, key=None, val=None, callback=None, *args):
return self._ws.send_method(
"server.database.post_item",
{"namespace": namespace,
"key": key,
"value": val},
callback,
*args
)
def get_database_item(self, namespace, key=None, callback=None, *args):
return self._ws.send_method(
"server.database.get_item",
{"namespace": namespace,
"key": key},
callback,
*args
)
def del_database_item(self, namespace, key=None, callback=None, *args):
return self._ws.send_method(
"server.database.delete_item",
{"namespace": namespace,
"key": key},
callback,
*args
)
def reset_job_history_totals(self, callback=None, *args):
return self._ws.send_method(
"server.history.reset_totals",
{},
callback,
*args
)
def del_all_job_history(self, callback=None, *args):
return self._ws.send_method(
"server.history.delete_job",
{"all": "true"},
callback,
*args
)
def object_subscription(self, updates):
logging.debug("Sending printer.objects.subscribe")
return self._ws.send_method(

View File

@ -578,6 +578,49 @@ class KlipperScreenConfig:
def set(self, section, name, value):
self.config.set(section, name, value)
def del_all(self, guide=False):
if self.config_path == self.default_config_path:
user_def = ""
saved_def = None
else:
user_def, saved_def = self.separate_saved_config(self.config_path)
save_output = ""
if guide:
save_config = configparser.ConfigParser()
save_config.add_section("main")
save_config.set("main", "onboarding", str(True))
save_output = self._build_config_string(save_config).split("\n")
print(f"{save_output}")
for i in range(len(save_output)):
save_output[i] = f"{self.do_not_edit_prefix} {save_output[i]}"
contents = (
f"{user_def}\n"
f"{self.do_not_edit_line}\n"
f"{self.do_not_edit_prefix}\n" + "\n".join(save_output) + f"\n"
f"{self.do_not_edit_prefix}\n"
)
if self.config_path != self.default_config_path:
filepath = self.config_path
else:
if os.path.exists(printer_data_config):
filepath = os.path.join(printer_data_config, self.configfile_name)
else:
try:
if not os.path.exists(xdg_config):
pathlib.Path(xdg_config).mkdir(parents=True, exist_ok=True)
filepath = os.path.join(xdg_config, self.configfile_name)
except Exception as e:
logging.error(e)
filepath = klipperscreendir
logging.info(f"Creating a new config file in {filepath}")
try:
with open(filepath, "w") as file:
file.write(contents)
except Exception as e:
logging.error(f"Error writing configuration file in {filepath}:\n{e}")
def log_config(self, config):
lines = [
" "

View File

@ -4,6 +4,8 @@ import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
from ks_includes.KlippyFactory import KlippyFactory
from ks_includes.KlippyGcodes import KlippyGcodes
from ks_includes.screen_panel import ScreenPanel
@ -45,6 +47,16 @@ class Panel(ScreenPanel):
"callback": self.set_auto_change_nozzle,
}
},
{
"factory_settings": {
"section": "main",
"name": _("Restore factory settings"),
"type": "button",
"tooltip": _("This operation will clear the user data"),
"value": "True",
"callback": self.reset_factory_settings,
}
},
]
options = self.advanced_options
self.labels["advanced_menu"] = self._gtk.ScrolledWindow()
@ -56,6 +68,48 @@ class Panel(ScreenPanel):
self.menu_list.update(res)
self.content.add(self.labels["advanced_menu"])
def reset_factory_settings(self, *args):
text = _("Confirm factory reset?\n") + "\n\n" + _("The system will reboot!")
label = Gtk.Label(wrap=True, vexpand=True)
label.set_markup(text)
label.set_margin_top(100)
clear_files_checkbox = Gtk.CheckButton(label=" " + _("Clear Internal G-code Files"))
clear_files_checkbox.set_halign(Gtk.Align.CENTER)
clear_files_checkbox.set_valign(Gtk.Align.CENTER)
buttons = [
{
"name": _("Accept"),
"response": Gtk.ResponseType.OK,
"style": "dialog-error",
},
{
"name": _("Cancel"),
"response": Gtk.ResponseType.CANCEL,
"style": "dialog-info",
},
]
grid = Gtk.Grid(row_homogeneous=True, column_homogeneous=True)
grid.set_row_spacing(20)
grid.set_column_spacing(0)
grid.attach(label, 0, 0, 1, 1)
grid.attach(clear_files_checkbox, 0, 1, 1, 1)
self._gtk.Dialog(
_("factory settings"),
buttons,
grid,
self.confirm_reset_factory_settings,
clear_files_checkbox,
)
def confirm_reset_factory_settings(self, dialog, response_id, clear_files_checkbox):
self._gtk.remove_dialog(dialog)
if response_id == Gtk.ResponseType.OK:
KlippyFactory.user_factory_reset(self._screen._ws.klippy, self._config, clear_files_checkbox.get_active())
def set_adaptive_leveling(self, *args):
self.set_configuration_feature("adaptive_meshing", *args)

View File

@ -188,6 +188,17 @@ scale-mark {
color: @yellow-opa-100;
}
checkbutton,
checkbutton check{
min-height: 1.1em;
min-width: 1.1em;
padding: 5px;
}
checkbutton:checked check{
background: @blue-opa-100;
}
scrollbar,
scrollbar button,
scrollbar trough {

View File

@ -229,6 +229,17 @@ scale-mark {
color: @bg;
}
checkbutton,
checkbutton check{
min-height: 1.1em;
min-width: 1.1em;
padding: 5px;
}
checkbutton:checked check{
background: @active;
}
scrollbar,
scrollbar button,
scrollbar trough {