reduce the use of contextlib

This commit is contained in:
alfrix 2023-07-17 11:22:59 -03:00 committed by Alfredo Monclus
parent 1d2a89b4e7
commit a149796b19
10 changed files with 86 additions and 103 deletions

View File

@ -1,9 +1,7 @@
# -*- coding: utf-8 -*-
import contextlib
import logging
import os
import pathlib
import gi
gi.require_version("Gtk", "3.0")

View File

@ -1,5 +1,4 @@
import logging
import contextlib
import gi
gi.require_version("Gtk", "3.0")
@ -115,14 +114,14 @@ class Printer:
# webhooks states: startup, ready, shutdown, error
# print_stats: standby, printing, paused, error, complete
# idle_timeout: Idle, Printing, Ready
if self.data['webhooks']['state'] == "ready":
with contextlib.suppress(KeyError):
if self.data['print_stats']['state'] == 'paused':
return "paused"
if self.data['print_stats']['state'] == 'printing':
return "printing"
if self.data['idle_timeout']['state'].lower() == "printing":
return "busy"
if self.data['webhooks']['state'] == "ready" and (
'print_stats' in self.data and 'state' in self.data['print_stats']):
if self.data['print_stats']['state'] == 'paused':
return "paused"
if self.data['print_stats']['state'] == 'printing':
return "printing"
if self.data['idle_timeout']['state'].lower() == "printing":
return "busy"
return self.data['webhooks']['state']
def process_status_update(self):

View File

@ -1,19 +1,16 @@
# Network in KlipperScreen is a connection in NetworkManager
# Interface in KlipperScreen is a device in NetworkManager
import contextlib
import logging
import uuid
from ks_includes import NetworkManager
import dbus
from dbus.mainloop.glib import DBusGMainLoop
import gi
gi.require_version('Gdk', '3.0')
from gi.repository import GLib
from contextlib import suppress
from ks_includes.wifi import WifiChannels
from ks_includes import NetworkManager
class WifiManager:
@ -21,7 +18,7 @@ class WifiManager:
def __init__(self, interface_name, *args, **kwargs):
super().__init__(*args, **kwargs)
DBusGMainLoop(set_as_default=True)
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
self._callbacks = {
"connected": [],
"connecting_status": [],
@ -56,7 +53,7 @@ class WifiManager:
self.known_networks[ssid] = con
def _ap_added(self, nm, interface, signal, access_point):
with contextlib.suppress(NetworkManager.ObjectVanished):
with suppress(NetworkManager.ObjectVanished):
access_point.OnPropertiesChanged(self._ap_prop_changed)
ssid = self._add_ap(access_point)
for cb in self._callbacks['scan_results']:
@ -183,7 +180,7 @@ class WifiManager:
def connect(self, ssid):
if ssid in self.known_networks:
conn = self.known_networks[ssid]
with contextlib.suppress(NetworkManager.ObjectVanished):
with suppress(NetworkManager.ObjectVanished):
msg = f"Connecting to: {ssid}"
logging.info(msg)
self.callback("connecting_status", msg)
@ -208,7 +205,7 @@ class WifiManager:
aps = self.wifi_dev.GetAccessPoints()
ret = {}
for ap in aps:
with contextlib.suppress(NetworkManager.ObjectVanished):
with suppress(NetworkManager.ObjectVanished):
ret[ap.Ssid] = ap
return ret
@ -216,7 +213,7 @@ class WifiManager:
netinfo = {}
if ssid in self.known_networks:
con = self.known_networks[ssid]
with contextlib.suppress(NetworkManager.ObjectVanished):
with suppress(NetworkManager.ObjectVanished):
settings = con.GetSettings()
if settings and '802-11-wireless' in settings:
netinfo.update({
@ -227,7 +224,7 @@ class WifiManager:
aps = self.visible_networks
if path in aps:
ap = aps[path]
with contextlib.suppress(NetworkManager.ObjectVanished):
with suppress(NetworkManager.ObjectVanished):
netinfo.update({
"mac": ap.HwAddress,
"channel": WifiChannels.lookup(str(ap.Frequency))[1],

View File

@ -1,5 +1,4 @@
# -*- coding: utf-8 -*-
import contextlib
import logging
import gi
@ -9,7 +8,7 @@ from gi.repository import GLib, Gtk, Pango
from jinja2 import Environment
from datetime import datetime
from math import log
from contextlib import suppress
from ks_includes.screen_panel import ScreenPanel
@ -208,11 +207,11 @@ class BasePanel(ScreenPanel):
if action == "notify_update_response":
if self.update_dialog is None:
self.show_update_dialog()
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.labels['update_progress'].set_text(
f"{self.labels['update_progress'].get_text().strip()}\n"
f"{data['message']}\n")
with contextlib.suppress(KeyError):
with suppress(KeyError):
if data['complete']:
logging.info("Update complete")
if self.update_dialog is not None:
@ -242,7 +241,7 @@ class BasePanel(ScreenPanel):
name = f"{name[:1].upper()}: "
self.labels[device].set_label(f"{name}{int(temp)}°")
with contextlib.suppress(Exception):
with suppress(Exception):
if data["toolhead"]["extruder"] != self.current_extruder:
self.control['temp_box'].remove(self.labels[f"{self.current_extruder}_box"])
self.current_extruder = data["toolhead"]["extruder"]

View File

@ -1,11 +1,8 @@
import logging
import contextlib
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, Pango
from ks_includes.KlippyGcodes import KlippyGcodes
from ks_includes.screen_panel import ScreenPanel
from ks_includes.widgets.bedmap import BedMap
@ -193,9 +190,10 @@ class BedMeshPanel(ScreenPanel):
if action == "notify_busy":
self.process_busy(data)
return
if action == "notify_status_update":
with contextlib.suppress(KeyError):
self.activate_mesh(data['bed_mesh']['profile_name'])
if action != "notify_status_update":
return
if 'bed_mesh' in data and 'profile_name' in data['bed_mesh']:
self.activate_mesh(data['bed_mesh']['profile_name'])
def remove_create(self):
if self.show_create is False:

View File

@ -1,11 +1,10 @@
import contextlib
import mpv
import logging
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk
from contextlib import suppress
from ks_includes.screen_panel import ScreenPanel
@ -47,11 +46,11 @@ class CameraPanel(ScreenPanel):
# Create mpv after show or the 'window' property will be None
self.mpv = mpv.MPV(log_handler=self.log, vo='gpu,wlshm,xv,x11')
with contextlib.suppress(Exception):
with suppress(Exception):
self.mpv.profile = 'sw-fast'
# LOW LATENCY PLAYBACK
with contextlib.suppress(Exception):
with suppress(Exception):
self.mpv.profile = 'low-latency'
self.mpv.untimed = True
self.mpv.audio = 'no'

View File

@ -1,6 +1,4 @@
import contextlib
import logging
import gi
gi.require_version("Gtk", "3.0")
@ -92,32 +90,32 @@ class ExcludeObjectPanel(ScreenPanel):
def process_update(self, action, data):
if action == "notify_status_update":
with contextlib.suppress(KeyError):
# Update objects
self.objects = data["exclude_object"]["objects"]
logging.info(f'Objects: {data["exclude_object"]["objects"]}')
for obj in self.buttons:
self.object_list.remove(self.buttons[obj])
self.buttons = {}
for obj in self.objects:
logging.info(f"Adding {obj['name']}")
self.add_object(obj["name"])
self.content.show_all()
with contextlib.suppress(KeyError):
# Update current objects
if data["exclude_object"]["current_object"]:
self.current_object.set_label(f'{data["exclude_object"]["current_object"].replace("_", " ")}')
self.update_graph()
with contextlib.suppress(KeyError):
# Update excluded objects
logging.info(f'Excluded objects: {data["exclude_object"]["excluded_objects"]}')
self.excluded_objects = data["exclude_object"]["excluded_objects"]
for name in self.excluded_objects:
if name in self.buttons:
self.object_list.remove(self.buttons[name])
self.update_graph()
if len(self.excluded_objects) == len(self.objects):
self._screen._menu_go_back()
if "exclude_object" in data:
if "object" in data["exclude_object"]: # Update objects
self.objects = data["exclude_object"]["objects"]
logging.info(f'Objects: {data["exclude_object"]["objects"]}')
for obj in self.buttons:
self.object_list.remove(self.buttons[obj])
self.buttons = {}
for obj in self.objects:
logging.info(f"Adding {obj['name']}")
self.add_object(obj["name"])
self.content.show_all()
if "current_object" in data["exclude_object"]: # Update objects
# Update current objects
if data["exclude_object"]["current_object"]:
self.current_object.set_label(f'{data["exclude_object"]["current_object"].replace("_", " ")}')
self.update_graph()
if "excluded_objects" in data["exclude_object"]: # Update objects
# Update excluded objects
logging.info(f'Excluded objects: {data["exclude_object"]["excluded_objects"]}')
self.excluded_objects = data["exclude_object"]["excluded_objects"]
for name in self.excluded_objects:
if name in self.buttons:
self.object_list.remove(self.buttons[name])
self.update_graph()
if len(self.excluded_objects) == len(self.objects):
self._screen._menu_go_back()
elif action == "notify_gcode_response" and "Excluding object" in data:
self._screen.show_popup_message(data, level=1)
self.update_graph()

View File

@ -1,7 +1,5 @@
import logging
import re
import contextlib
import gi
gi.require_version("Gtk", "3.0")
@ -136,13 +134,12 @@ class FineTunePanel(ScreenPanel):
self.labels['zoffset'].set_label(' 0.00mm')
self._screen._ws.klippy.gcode_script("SET_GCODE_OFFSET Z=0 MOVE=1")
elif direction in ["+", "-"]:
with contextlib.suppress(KeyError):
z_offset = float(self._printer.data["gcode_move"]["homing_origin"][2])
if direction == "+":
z_offset += float(self.bs_delta)
else:
z_offset -= float(self.bs_delta)
self.labels['zoffset'].set_label(f' {z_offset:.3f}mm')
z_offset = float(self._printer.data["gcode_move"]["homing_origin"][2])
if direction == "+":
z_offset += float(self.bs_delta)
else:
z_offset -= float(self.bs_delta)
self.labels['zoffset'].set_label(f' {z_offset:.3f}mm')
self._screen._ws.klippy.gcode_script(f"SET_GCODE_OFFSET Z_ADJUST={direction}{self.bs_delta} MOVE=1")
def change_bs_delta(self, widget, bs):

View File

@ -1,15 +1,15 @@
# -*- coding: utf-8 -*-
import logging
import os
import contextlib
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import GLib, Gtk, Pango
from ks_includes.screen_panel import ScreenPanel
from contextlib import suppress
from math import pi, sqrt
from statistics import median
from time import time
from ks_includes.screen_panel import ScreenPanel
def create_panel(*args):
@ -531,41 +531,41 @@ class JobStatusPanel(ScreenPanel):
f"{data['display_status']['message'] if data['display_status']['message'] is not None else ''}"
)
with contextlib.suppress(KeyError):
with suppress(KeyError):
if data["toolhead"]["extruder"] != self.current_extruder:
self.labels['temp_grid'].remove_column(0)
self.labels['temp_grid'].insert_column(0)
self.current_extruder = data["toolhead"]["extruder"]
self.labels['temp_grid'].attach(self.buttons['extruder'][self.current_extruder], 0, 0, 1, 1)
self._screen.show_all()
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.labels['max_accel'].set_label(f"{data['toolhead']['max_accel']:.0f} {self.mms2}")
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.labels['advance'].set_label(f"{data['extruder']['pressure_advance']:.2f}")
if "gcode_move" in data:
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.pos_z = round(float(data['gcode_move']['gcode_position'][2]), 2)
self.buttons['z'].set_label(f"Z: {self.pos_z:6.2f}{f'/{self.oheight}' if self.oheight > 0 else ''}")
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.extrusion = round(float(data["gcode_move"]["extrude_factor"]) * 100)
self.labels['extrude_factor'].set_label(f"{self.extrusion:3}%")
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.speed = round(float(data["gcode_move"]["speed_factor"]) * 100)
self.speed_factor = float(data["gcode_move"]["speed_factor"])
self.labels['speed_factor'].set_label(f"{self.speed:3}%")
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.req_speed = round(float(data["gcode_move"]["speed"]) / 60 * self.speed_factor)
self.labels['req_speed'].set_label(
f"{self.speed}% {self.vel:3.0f}/{self.req_speed:3.0f} "
f"{f'{self.mms}' if self.vel < 1000 and self.req_speed < 1000 and self._screen.width > 500 else ''}"
)
self.buttons['speed'].set_label(self.labels['req_speed'].get_label())
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.zoffset = float(data["gcode_move"]["homing_origin"][2])
self.labels['zoffset'].set_label(f"{self.zoffset:.3f} {self.mm}")
if "motion_report" in data:
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.labels['pos_x'].set_label(f"X: {data['motion_report']['live_position'][0]:6.2f}")
self.labels['pos_y'].set_label(f"Y: {data['motion_report']['live_position'][1]:6.2f}")
self.labels['pos_z'].set_label(f"Z: {data['motion_report']['live_position'][2]:6.2f}")
@ -577,14 +577,14 @@ class JobStatusPanel(ScreenPanel):
evelocity = (pos[3] - self.prev_pos[0][3]) / interval
self.flowstore.append(self.fila_section * evelocity)
self.prev_pos = [pos, now]
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.vel = float(data["motion_report"]["live_velocity"])
self.labels['req_speed'].set_label(
f"{self.speed}% {self.vel:3.0f}/{self.req_speed:3.0f} "
f"{f'{self.mms}' if self.vel < 1000 and self.req_speed < 1000 and self._screen.width > 500 else ''}"
)
self.buttons['speed'].set_label(self.labels['req_speed'].get_label())
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.flowstore.append(self.fila_section * float(data["motion_report"]["live_extruder_velocity"]))
fan_label = ""
for fan in self.fans:
@ -595,14 +595,14 @@ class JobStatusPanel(ScreenPanel):
if "virtual_sdcard" in data and "progress" in data["virtual_sdcard"]:
self.update_progress(data["virtual_sdcard"]["progress"])
if "print_stats" in data:
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.set_state(
data["print_stats"]["state"],
msg=f'{data["print_stats"]["message"] if "message" in data["print_stats"] else ""}'
)
with contextlib.suppress(KeyError):
with suppress(KeyError):
self.update_filename(data['print_stats']["filename"])
with contextlib.suppress(KeyError):
with suppress(KeyError):
if 'print_duration' in data["print_stats"]:
if 'filament_used' in data["print_stats"]:
self.labels['filament_used'].set_label(
@ -614,10 +614,10 @@ class JobStatusPanel(ScreenPanel):
f"{data['print_stats']['filament_used'] if 'filament_used' in 'print_stats' else 0}"
)
if 'info' in data["print_stats"]:
with contextlib.suppress(KeyError):
with suppress(KeyError):
if data["print_stats"]['info']['total_layer'] is not None:
self.labels['total_layers'].set_label(f"{data['print_stats']['info']['total_layer']}")
with contextlib.suppress(KeyError):
with suppress(KeyError):
if data["print_stats"]['info']['current_layer'] is not None:
self.labels['layer'].set_label(
f"{data['print_stats']['info']['current_layer']} / "
@ -648,18 +648,18 @@ class JobStatusPanel(ScreenPanel):
slicer_time = filament_time = file_time = None
timeleft_type = self._config.get_config()['main'].get('print_estimate_method', 'auto')
with contextlib.suppress(KeyError):
with suppress(KeyError):
if self.file_metadata['estimated_time'] > 0:
# speed_factor compensation based on empirical testing
spdcomp = sqrt(self.speed_factor)
slicer_time = ((self.file_metadata['estimated_time']) / spdcomp) + non_printing
self.labels["slicer_time"].set_label(self.format_time(slicer_time))
with contextlib.suppress(Exception):
with suppress(Exception):
if self.file_metadata['filament_total'] > fila_used:
filament_time = (total_duration / (fila_used / self.file_metadata['filament_total'])) + non_printing
self.labels["filament_time"].set_label(self.format_time(filament_time))
with contextlib.suppress(ZeroDivisionError):
with suppress(ZeroDivisionError):
file_time = (total_duration / self.progress) + non_printing
self.labels["file_time"].set_label(self.format_time(file_time))

View File

@ -1,11 +1,9 @@
import logging
import contextlib
import gi
gi.require_version("Gtk", "3.0")
from gi.repository import Gtk, GLib
from contextlib import suppress
from ks_includes.screen_panel import ScreenPanel
from ks_includes.widgets.heatergraph import HeaterGraph
from ks_includes.widgets.keypad import Keypad
@ -230,7 +228,7 @@ class TemperaturePanel(ScreenPanel):
target = None
max_temp = float(self._printer.get_config_section(heater)['max_temp'])
name = heater.split()[1] if len(heater.split()) > 1 else heater
with contextlib.suppress(KeyError):
with suppress(KeyError):
for i in self.preheat_options[setting]:
logging.info(f"{self.preheat_options[setting]}")
if i == name:
@ -247,19 +245,19 @@ class TemperaturePanel(ScreenPanel):
self._screen._ws.klippy.set_tool_temp(self._printer.get_tool_number(heater), target)
elif heater.startswith('heater_bed'):
if target is None:
with contextlib.suppress(KeyError):
with suppress(KeyError):
target = self.preheat_options[setting]["bed"]
if self.validate(heater, target, max_temp):
self._screen._ws.klippy.set_bed_temp(target)
elif heater.startswith('heater_generic '):
if target is None:
with contextlib.suppress(KeyError):
with suppress(KeyError):
target = self.preheat_options[setting]["heater_generic"]
if self.validate(heater, target, max_temp):
self._screen._ws.klippy.set_heater_temp(name, target)
elif heater.startswith('temperature_fan '):
if target is None:
with contextlib.suppress(KeyError):
with suppress(KeyError):
target = self.preheat_options[setting]["temperature_fan"]
if self.validate(heater, target, max_temp):
self._screen._ws.klippy.set_temp_fan_temp(name, target)
@ -279,7 +277,7 @@ class TemperaturePanel(ScreenPanel):
return False
def preheat_gcode(self, setting):
with contextlib.suppress(KeyError):
with suppress(KeyError):
self._screen._ws.klippy.gcode_script(self.preheat_options[setting]['gcode'])
return False