From 222346415f7f6ef67863d51f5f43afe51dec1e67 Mon Sep 17 00:00:00 2001 From: zkk <1007518571@qq.com> Date: Sat, 12 Oct 2024 15:39:29 +0800 Subject: [PATCH 1/2] =?UTF-8?q?=E6=9B=B4=E6=96=B0WiFi=E9=A1=B5=E9=9D=A2=20?= =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E5=B7=B2=E7=9F=A5Bug?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- ks_includes/sdbus_nm.py | 86 ++++++++++++----------- panels/network.py | 151 ++++++++++++++++++++++++++++------------ 2 files changed, 153 insertions(+), 84 deletions(-) diff --git a/ks_includes/sdbus_nm.py b/ks_includes/sdbus_nm.py index 1e3ed5cb..ac1525de 100644 --- a/ks_includes/sdbus_nm.py +++ b/ks_includes/sdbus_nm.py @@ -4,6 +4,7 @@ import subprocess import logging +import sdbus from sdbus_block.networkmanager import ( NetworkManager, NetworkDeviceGeneric, @@ -17,7 +18,6 @@ from sdbus_block.networkmanager import ( enums, exceptions, ) -from sdbus import sd_bus_open_system, set_default_bus from gi.repository import GLib from uuid import uuid4 @@ -91,10 +91,10 @@ class SdbusNm: def __init__(self, popup_callback): self.ensure_nm_running() - self.system_bus = sd_bus_open_system() # We need system bus + self.system_bus = sdbus.sd_bus_open_system() # We need system bus if self.system_bus is None: return None - set_default_bus(self.system_bus) + sdbus.set_default_bus(self.system_bus) self.nm = NetworkManager() self.wlan_device = ( self.get_wireless_interfaces()[0] @@ -163,9 +163,6 @@ class SdbusNm: def is_known(self, ssid): return any(net["SSID"] == ssid for net in self.get_known_networks()) - def is_open(self, ssid): - return self.get_security_type(ssid) == "Open" - def get_ip_address(self): active_connection_path = self.nm.primary_connection if not active_connection_path or active_connection_path == "/": @@ -176,26 +173,29 @@ class SdbusNm: def get_networks(self): networks = [] - if self.wlan_device: - all_aps = [AccessPoint(result) for result in self.wlan_device.access_points] - networks.extend( - { - "SSID": ap.ssid.decode("utf-8"), - "known": self.is_known(ap.ssid.decode("utf-8")), - "security": get_encryption( - ap.rsn_flags or ap.wpa_flags or ap.flags - ), - "frequency": WifiChannels(ap.frequency)[0], - "channel": WifiChannels(ap.frequency)[1], - "signal_level": ap.strength, - "max_bitrate": ap.max_bitrate, - "BSSID": ap.hw_address, - } - for ap in all_aps - if ap.ssid - ) - return sorted(networks, key=lambda i: i["signal_level"], reverse=True) - return networks + try: + if self.wlan_device: + all_aps = [AccessPoint(result) for result in self.wlan_device.access_points] + networks.extend( + { + "SSID": ap.ssid.decode("utf-8"), + "known": self.is_known(ap.ssid.decode("utf-8")), + "security": get_encryption( + ap.rsn_flags or ap.wpa_flags or ap.flags + ), + "frequency": WifiChannels(ap.frequency)[0], + "channel": WifiChannels(ap.frequency)[1], + "signal_level": ap.strength, + "max_bitrate": ap.max_bitrate, + "BSSID": ap.hw_address, + } + for ap in all_aps + if ap.ssid + ) + return sorted(networks, key=lambda i: i["signal_level"], reverse=True) + return networks + except Exception as e: + return networks def get_bssid_from_ssid(self, ssid): return next(net["BSSID"] for net in self.get_networks() if ssid == net["SSID"]) @@ -222,8 +222,9 @@ class SdbusNm: None, ) - def add_network(self, ssid, psk): + def add_network(self, ssid, psk, eap_method, identity="", phase2=None): security_type = self.get_security_type(ssid) + logging.debug(f"Adding network of type: {security_type}") if security_type is None: return {"error": "network_not_found", "message": _("Network not found")} @@ -240,17 +241,17 @@ class SdbusNm: "802-11-wireless": { "mode": ("s", "infrastructure"), "ssid": ("ay", ssid.encode("utf-8")), + "security": ("s", "802-11-wireless-security"), }, "ipv4": {"method": ("s", "auto")}, "ipv6": {"method": ("s", "auto")}, } - if security_type != "Open": - properties["802-11-wireless"]["security"] = ( - "s", - "802-11-wireless-security", - ) - if "WPA-PSK" in security_type: + if security_type == "Open": + properties["802-11-wireless-security"] = { + "key-mgmt": ("s", "none"), + } + elif "WPA-PSK" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "wpa-psk"), "psk": ("s", psk), @@ -268,15 +269,16 @@ class SdbusNm: elif "OWE" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "owe"), - "psk": ("s", psk), } elif "802.1x" in security_type: properties["802-11-wireless-security"] = { - "key-mgmt": ("s", "ieee8021x"), - "wep-key-type": ("u", 2), - "wep-key0": ("s", psk), - "auth-alg": ("s", "shared"), + "key-mgmt": ("s", "wpa-eap"), + "eap": ("as", [eap_method]), + "identity": ("s", identity), + "password": ("s", psk), } + if phase2: + properties["802-11-wireless-security"]["phase2_auth"] = ("s", phase2) elif "WEP" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "none"), @@ -327,7 +329,10 @@ class SdbusNm: } def rescan(self): - return self.wlan_device.request_scan({}) + try: + return self.wlan_device.request_scan({}) + except Exception as e: + self.popup(f"Unexpected error: {e}") def get_connection_path_by_ssid(self, ssid): existing_networks = NetworkManagerSettings().list_connections() @@ -345,7 +350,8 @@ class SdbusNm: def connect(self, ssid): if target_connection := self.get_connection_path_by_ssid(ssid): - self.popup(f"{ssid}\n{_('Starting WiFi Association')}", 1) + message = ssid + "\n" + _("Starting WiFi Association") + self.popup(message, 1) try: active_connection = self.nm.activate_connection(target_connection) return target_connection diff --git a/panels/network.py b/panels/network.py index 56bdb899..680a329c 100644 --- a/panels/network.py +++ b/panels/network.py @@ -43,6 +43,10 @@ class Panel(ScreenPanel): self._screen.panels_reinit.append(self._screen._cur_panels[-1]) return self.update_timeout = None + self.conn_status = None + self.init_status = False + self.reload = False + self.last_ap_bssid = '' self.network_list = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, hexpand=True, vexpand=True) self.network_rows = {} self.networks = {} @@ -109,8 +113,6 @@ class Panel(ScreenPanel): self._screen.show_popup_message(msg, level) def load_networks(self): - for net in self.sdbus_nm.get_networks(): - self.add_network(net['BSSID']) GLib.timeout_add_seconds(10, self._gtk.Button_busy, self.reload_button, False) self.content.show_all() return False @@ -119,7 +121,13 @@ class Panel(ScreenPanel): if bssid in self.network_rows: return - net = next(net for net in self.sdbus_nm.get_networks() if bssid == net['BSSID']) + networks = self.sdbus_nm.get_networks() + if networks: + net = next((net for net in networks if bssid == net['BSSID']), None) + if net is None: + self.remove_network_from_list(bssid) + return + ssid = net['SSID'] connect = self._gtk.Button("load", None, "color3", self.bts) @@ -144,11 +152,9 @@ class Panel(ScreenPanel): buttons.add(delete) buttons.add(connect) - info = Gtk.Label(halign=Gtk.Align.START) labels = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, vexpand=True, halign=Gtk.Align.START, valign=Gtk.Align.CENTER) labels.add(name) - labels.add(info) icon = self._gtk.Image() self.network_rows[bssid] = Gtk.Box(spacing=5, hexpand=True, vexpand=False) @@ -161,7 +167,6 @@ class Panel(ScreenPanel): "connect": connect, "delete": delete, "icon": icon, - "info": info, "name": name, "row": self.network_rows[bssid], } @@ -195,7 +200,13 @@ class Panel(ScreenPanel): def add_new_network(self, widget, ssid): self._screen.remove_keyboard() - result = self.sdbus_nm.add_network(ssid, self.labels['network_psk'].get_text()) + psk = self.labels['network_psk'].get_text() + identity = self.labels['network_identity'].get_text() + eap_method = self.get_dropdown_value(self.labels['network_eap_method']) + phase2 = self.get_dropdown_value(self.labels['network_phase2']) + logging.debug(f"{phase2=}") + logging.debug(f"{eap_method=}") + result = self.sdbus_nm.add_network(ssid, psk, eap_method, identity, phase2) if "error" in result: self._screen.show_popup_message(result["message"]) if result["error"] == "psk_invalid": @@ -204,6 +215,12 @@ class Panel(ScreenPanel): self.connect_network(widget, ssid, showadd=False) self.close_add_network() + def get_dropdown_value(self, dropdown, default=None): + tree_iter = dropdown.get_active_iter() + model = dropdown.get_model() + result = model[tree_iter][0] + return result if result != "disabled" else None + def back(self): if self.show_add: self.close_add_network() @@ -218,7 +235,7 @@ class Panel(ScreenPanel): self.content.remove(child) self.content.add(self.labels['main_box']) self.content.show() - for i in ['add_network', 'network_psk']: + for i in ['add_network', 'network_psk', 'network_identity']: if i in self.labels: del self.labels[i] self.show_add = False @@ -226,7 +243,8 @@ class Panel(ScreenPanel): def connect_network(self, widget, ssid, showadd=True): self.deactivate() if showadd and not self.sdbus_nm.is_known(ssid): - if self.sdbus_nm.is_open(ssid): + sec_type = self.sdbus_nm.get_security_type(ssid) + if sec_type == "Open" or "OWE" in sec_type: logging.debug("Network is Open do not show psk") result = self.sdbus_nm.add_network(ssid, '') if "error" in result: @@ -240,6 +258,9 @@ class Panel(ScreenPanel): self.remove_network_from_list(bssid) self.sdbus_nm.connect(ssid) self.reload_networks() + if self.conn_status is None: + self.sdbus_nm.enable_monitoring(True) + self.conn_status = GLib.timeout_add(500, self.sdbus_nm.monitor_connection_status) def remove_network_from_list(self, bssid): if bssid not in self.network_rows: @@ -260,7 +281,25 @@ class Panel(ScreenPanel): if "add_network" in self.labels: del self.labels['add_network'] - label = Gtk.Label(label=_("PSK for") + f' {ssid}', hexpand=False) + eap_method = Gtk.ComboBoxText(hexpand=True) + for method in ("peap", "ttls", "pwd", "leap", "md5"): + eap_method.append(method, method.upper()) + self.labels['network_eap_method'] = eap_method + eap_method.set_active(0) + + phase2 = Gtk.ComboBoxText(hexpand=True) + for method in ("mschapv2", "gtc", "pap", "chap", "mschap", "disabled"): + phase2.append(method, method.upper()) + self.labels['network_phase2'] = phase2 + phase2.set_active(0) + + auth_selection_box = Gtk.Box(no_show_all=True) + auth_selection_box.add(self.labels['network_eap_method']) + auth_selection_box.add(self.labels['network_phase2']) + + self.labels['network_identity'] = Gtk.Entry(hexpand=True, no_show_all=True) + self.labels['network_identity'].connect("focus-in-event", self._screen.show_keyboard) + self.labels['network_psk'] = Gtk.Entry(hexpand=True) self.labels['network_psk'].connect("activate", self.add_new_network, ssid) self.labels['network_psk'].connect("focus-in-event", self._screen.show_keyboard) @@ -269,21 +308,47 @@ class Panel(ScreenPanel): save.set_hexpand(False) save.connect("clicked", self.add_new_network, ssid) - box = Gtk.Box() - box.pack_start(self.labels['network_psk'], True, True, 5) - box.pack_start(save, False, False, 5) + user_label = Gtk.Label(label=_("User"), hexpand=False, no_show_all=True) + auth_grid = Gtk.Grid() + auth_grid.attach(user_label, 0, 0, 1, 1) + auth_grid.attach(self.labels['network_identity'], 1, 0, 1, 1) + auth_grid.attach(Gtk.Label(label=_("Password"), hexpand=False), 0, 1, 1, 1) + auth_grid.attach(self.labels['network_psk'], 1, 1, 1, 1) + auth_grid.attach(save, 2, 0, 1, 2) - self.labels['add_network'] = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=5, valign=Gtk.Align.CENTER, - hexpand=True, vexpand=True) - self.labels['add_network'].pack_start(label, True, True, 5) - self.labels['add_network'].pack_start(box, True, True, 5) + if "802.1x" in self.sdbus_nm.get_security_type(ssid): + user_label.show() + self.labels['network_eap_method'].show() + self.labels['network_phase2'].show() + self.labels['network_identity'].show() + auth_selection_box.show() - self.content.add(self.labels['add_network']) + self.labels['add_network'] = Gtk.Box( + orientation=Gtk.Orientation.VERTICAL, spacing=5, valign=Gtk.Align.CENTER, + hexpand=True, vexpand=True + ) + self.labels['add_network'].add(Gtk.Label(label=_("Connecting to %s") % ssid)) + self.labels['add_network'].add(auth_selection_box) + self.labels['add_network'].add(auth_grid) + scroll = self._gtk.ScrolledWindow() + scroll.add(self.labels['add_network']) + self.content.add(scroll) self.labels['network_psk'].grab_focus_without_selecting() self.content.show_all() self.show_add = True def update_all_networks(self): + self.ap_bssid = self.sdbus_nm.get_connected_bssid() + if self.last_ap_bssid != self.ap_bssid: + self.remove_network_from_list(self.last_ap_bssid) + self.remove_network_from_list(self.ap_bssid) + self.last_ap_bssid = self.ap_bssid + + if self.reload: + for child in self.network_list.get_children(): + self.network_list.remove(child) + self.reload = False + self.interface = self.sdbus_nm.get_primary_interface() self.labels['interface'].set_text(_("Interface") + f': {self.interface}') self.labels['ip'].set_text(f"IP: {self.sdbus_nm.get_ip_address()}") @@ -297,8 +362,9 @@ class Panel(ScreenPanel): self.update_network_info(net) for i, net in enumerate(nets): for child in self.network_list.get_children(): - if child == self.network_rows[net['BSSID']]: - self.network_list.reorder_child(child, i) + if 'BSSID' in net and net['BSSID'] in self.network_rows: + if child == self.network_rows[net['BSSID']]: + self.network_list.reorder_child(child, i) self.network_list.show_all() return True @@ -306,19 +372,7 @@ class Panel(ScreenPanel): if net['BSSID'] not in self.network_rows.keys() or net['BSSID'] not in self.networks: logging.info(f"Unknown SSID {net['SSID']}") return - info = _("Password saved") + '\n' if net['known'] else "" - chan = _("Channel") + f' {net["channel"]}' - max_bitrate = _("Max:") + f"{self.format_speed(net['max_bitrate'])}" self.networks[net['BSSID']]['icon'].set_from_pixbuf(self.get_signal_strength_icon(net["signal_level"])) - self.networks[net['BSSID']]['info'].set_markup( - "" - f"{info}" - f"{net['security']}\n" - f"{max_bitrate}\n" - f"{net['frequency']} Ghz {chan} {net['signal_level']} %\n" - f"{net['BSSID']}" - "" - ) def get_signal_strength_icon(self, signal_level): # networkmanager uses percentage not dbm @@ -344,25 +398,27 @@ class Panel(ScreenPanel): self.deactivate() del self.network_rows self.network_rows = {} - for child in self.network_list.get_children(): - self.network_list.remove(child) if self.sdbus_nm is not None and self.sdbus_nm.wifi: if widget: self._gtk.Button_busy(widget, True) - self.sdbus_nm.rescan() + if not self.init_status: + self.sdbus_nm.rescan() + else: + self.init_status = False self.load_networks() self.activate() + self.reload = True def activate(self): if self.sdbus_nm is None: return if self.update_timeout is None: if self.sdbus_nm.wifi: - if self.reload_button.get_sensitive(): - self._gtk.Button_busy(self.reload_button, True) - self.sdbus_nm.rescan() - self.load_networks() - self.update_all_networks() + if self.sdbus_nm.is_wifi_enabled(): + if self.reload_button.get_sensitive(): + self._gtk.Button_busy(self.reload_button, True) + self.sdbus_nm.rescan() + self.load_networks() self.update_timeout = GLib.timeout_add_seconds(5, self.update_all_networks) else: self.update_single_network_info() @@ -376,12 +432,19 @@ class Panel(ScreenPanel): self.update_timeout = None if self.sdbus_nm.wifi: self.sdbus_nm.enable_monitoring(False) + if self.conn_status is not None: + GLib.source_remove(self.conn_status) + self.conn_status = None def toggle_wifi(self, switch, gparams): enable = switch.get_active() - if enable: - self.reload_button.show() - else: - self.reload_button.hide() logging.info(f"WiFi {enable}") self.sdbus_nm.toggle_wifi(enable) + if enable: + self.reload_button.show() + self.network_list.show() + self.init_status = True + self.reload_networks(self.reload_button) + else: + self.reload_button.hide() + self.network_list.hide() From 99833457bec7d148767f181b22d6be7d3631ca02 Mon Sep 17 00:00:00 2001 From: zkk <1007518571@qq.com> Date: Mon, 14 Oct 2024 17:47:49 +0800 Subject: [PATCH 2/2] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=E9=80=9A=E7=94=A8?= =?UTF-8?q?=E5=8A=A0=E7=83=AD=E5=99=A8=E6=A0=A1=E5=87=86PID=E6=8A=A5?= =?UTF-8?q?=E9=94=99=E9=97=AE=E9=A2=98?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- panels/numpad.py | 5 +++-- panels/temperature.py | 7 +++---- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/panels/numpad.py b/panels/numpad.py index 533b2d01..6402a219 100644 --- a/panels/numpad.py +++ b/panels/numpad.py @@ -88,14 +88,15 @@ class Panel(ScreenPanel): self.hide_numpad() def pid_calibrate(self, temp): + heater = self.active_heater.split(' ', maxsplit=1)[-1] if self.verify_max_temp(temp): script = { - "script": f"PID_CALIBRATE HEATER={self.active_heater} TARGET={temp}" + "script": f"PID_CALIBRATE HEATER={heater} TARGET={temp}" } self._screen._confirm_send_action( None, _("Initiate a PID calibration for:") - + f" {self.active_heater} @ {temp} ºC" + + f" {heater} @ {temp} ºC" + "\n\n" + _("It may take more than 5 minutes depending on the heater power."), "printer.gcode.script", diff --git a/panels/temperature.py b/panels/temperature.py index e408803a..c21c9777 100644 --- a/panels/temperature.py +++ b/panels/temperature.py @@ -375,14 +375,13 @@ class Panel(ScreenPanel): return max(temp, 0) def pid_calibrate(self, temp): + heater = self.active_heater.split(' ', maxsplit=1)[-1] if self.verify_max_temp(temp): - script = { - "script": f"PID_CALIBRATE HEATER={self.active_heater} TARGET={temp}" - } + script = {"script": f"PID_CALIBRATE HEATER={heater} TARGET={temp}"} self._screen._confirm_send_action( None, _("Initiate a PID calibration for:") - + f" {self.active_heater} @ {temp} ºC" + + f" {heater} @ {temp} ºC" + "\n\n" + _("It may take more than 5 minutes depending on the heater power."), "printer.gcode.script",