# This is the backend of the UI panel that communicates to sdbus-networkmanager # TODO device selection/swtichability # Alfredo Monclus (alfrix) 2024 import subprocess import logging import sdbus from sdbus_block.networkmanager import ( NetworkManager, NetworkDeviceGeneric, NetworkDeviceWireless, NetworkConnectionSettings, NetworkManagerSettings, AccessPoint, NetworkManagerConnectionProperties, IPv4Config, ActiveConnection, enums, exceptions, ) from gi.repository import GLib from uuid import uuid4 NONE = 0 # The access point has no special security requirements. PAIR_WEP40 = 1 # 40/64-bit WEP is supported for pairwise/unicast encryption. PAIR_WEP104 = 2 # 104/128-bit WEP is supported for pairwise/unicast encryption. PAIR_TKIP = 4 # TKIP is supported for pairwise/unicast encryption. PAIR_CCMP = 8 # AES/CCMP is supported for pairwise/unicast encryption. GROUP_WEP40 = 16 # 40/64-bit WEP is supported for group/broadcast encryption. GROUP_WEP104 = 32 # 104/128-bit WEP is supported for group/broadcast encryption. GROUP_TKIP = 64 # TKIP is supported for group/broadcast encryption. GROUP_CCMP = 128 # AES/CCMP is supported for group/broadcast encryption. KEY_MGMT_PSK = 256 # WPA/RSN Pre-Shared Key encryption KEY_MGMT_802_1X = 512 # 802.1x authentication and key management KEY_MGMT_SAE = 1024 # WPA/RSN Simultaneous Authentication of Equals KEY_MGMT_OWE = 2048 # WPA/RSN Opportunistic Wireless Encryption KEY_MGMT_OWE_TM = 4096 # WPA/RSN Opportunistic Wireless Encryption transition mode KEY_MGMT_EAP_SUITE_B_192 = 8192 # WPA3 Enterprise Suite-B 192 def get_encryption(flags): if flags == 0: return "Open" encryption_mapping = { PAIR_WEP40: "WEP", PAIR_WEP104: "WEP", PAIR_TKIP: "TKIP", PAIR_CCMP: "AES", GROUP_WEP40: "WEP", GROUP_WEP104: "WEP", GROUP_TKIP: "TKIP", GROUP_CCMP: "AES", KEY_MGMT_PSK: "WPA-PSK", KEY_MGMT_802_1X: "802.1x", KEY_MGMT_SAE: "WPA-SAE", KEY_MGMT_OWE: "OWE", KEY_MGMT_OWE_TM: "OWE-TM", KEY_MGMT_EAP_SUITE_B_192: "WPA3-B192", } encryption_methods = [] for flag, method_name in encryption_mapping.items(): if flags & flag and method_name not in encryption_methods: encryption_methods.append(method_name) return " ".join(encryption_methods) def WifiChannels(freq: str): if freq == "2484": return "2.4", "14" try: freq = float(freq) except ValueError: return "?", "?" if 2412 <= freq <= 2472: return "2.4", str(int((freq - 2407) / 5)) elif 3657.5 <= freq <= 3692.5: return "3", str(int((freq - 3000) / 5)) elif 4915 <= freq <= 4980: return "5", str(int((freq - 4000) / 5)) elif 5035 <= freq <= 5885: return "5", str(int((freq - 5000) / 5)) elif 6455 <= freq <= 7115: return "6", str(int((freq - 5950) / 5)) else: return "?", "?" class SdbusNm: def __init__(self, popup_callback): self.ensure_nm_running() self.system_bus = sdbus.sd_bus_open_system() # We need system bus if self.system_bus is None: return None sdbus.set_default_bus(self.system_bus) self.nm = NetworkManager() self.wlan_device = ( self.get_wireless_interfaces()[0] if self.get_wireless_interfaces() else None ) self.wifi = self.wlan_device is not None self.monitor_connection = False self.wifi_state = -1 self.popup = popup_callback def ensure_nm_running(self): try: status = subprocess.run( ["systemctl", "is-active", "--quiet", "NetworkManager"] ) if status.returncode != 0: raise RuntimeError("Failed to detect NetworkManager service") except FileNotFoundError as e: logging.exception(f"{e}") raise RuntimeError(f"{e}") from e def is_wifi_enabled(self): return self.nm.wireless_enabled def get_interfaces(self): return [ NetworkDeviceGeneric(device).interface for device in self.nm.get_devices() ] def get_wireless_interfaces(self): devices = {path: NetworkDeviceGeneric(path) for path in self.nm.get_devices()} return [ NetworkDeviceWireless(path) for path, device in devices.items() if device.device_type == enums.DeviceType.WIFI ] def get_primary_interface(self): if self.nm.primary_connection == "/": if self.wlan_device: return self.wlan_device.interface return next( (interface for interface in self.get_interfaces() if interface != "lo"), None, ) gateway = ActiveConnection(self.nm.primary_connection).devices[0] return NetworkDeviceGeneric(gateway).interface @staticmethod def get_known_networks(): known_networks = [] saved_network_paths = NetworkManagerSettings().list_connections() for netpath in saved_network_paths: saved_con = NetworkConnectionSettings(netpath) con_settings = saved_con.get_settings() if con_settings["connection"]["type"][1] == "802-11-wireless": known_networks.append( { "SSID": con_settings["802-11-wireless"]["ssid"][1].decode(), "UUID": con_settings["connection"]["uuid"][1], } ) return known_networks def is_known(self, ssid): return any(net["SSID"] == ssid for net in self.get_known_networks()) def get_ip_address(self): active_connection_path = self.nm.primary_connection if not active_connection_path or active_connection_path == "/": return "?" active_connection = ActiveConnection(active_connection_path) ip_info = IPv4Config(active_connection.ip4_config) return ip_info.address_data[0]["address"][1] def get_networks(self): networks = [] try: if self.wlan_device: seen_networks = {} all_aps = [AccessPoint(result) for result in self.wlan_device.access_points] for ap in all_aps: if not ap.ssid: continue ssid = ap.ssid.decode("utf-8") signal_level = ap.strength if ssid in seen_networks: if signal_level > seen_networks[ssid]["signal_level"]: seen_networks[ssid] = { "SSID": ssid, "known": self.is_known(ssid), "security": get_encryption(ap.rsn_flags or ap.wpa_flags or ap.flags), "signal_level": signal_level, "BSSID": ap.hw_address, } else: seen_networks[ssid] = { "SSID": ssid, "known": self.is_known(ssid), "security": get_encryption(ap.rsn_flags or ap.wpa_flags or ap.flags), "signal_level": signal_level, "BSSID": ap.hw_address, } networks = list(seen_networks.values()) return sorted(networks, key=lambda i: i["signal_level"], reverse=True) return networks except Exception as e: return networks def get_bssid_from_ssid(self, ssid): return next(net["BSSID"] for net in self.get_networks() if ssid == net["SSID"]) def get_is_connected(self): state = self.wlan_device.state if state in [ enums.DeviceState.ACTIVATED, ]: return True else: return False def get_connected_ap(self): if self.wlan_device.active_access_point == "/": return None return AccessPoint(self.wlan_device.active_access_point) def get_connected_bssid(self): return ( self.get_connected_ap().hw_address if self.get_connected_ap() is not None else None ) def get_signal_strength(self): ap = self.get_connected_ap() return ap.strength if ap else None def get_security_type(self, ssid): return next( ( network["security"] for network in self.get_networks() if network["SSID"] == ssid ), None, ) def add_network(self, ssid, psk, eap_method, identity="", phase2=None): security_type = self.get_security_type(ssid) logging.debug(f"Adding network of type: {security_type}") if security_type is None: return {"error": "network_not_found", "message": _("Network not found")} if self.is_known(ssid): self.delete_network(ssid) properties: NetworkManagerConnectionProperties = { "connection": { "id": ("s", ssid), "uuid": ("s", str(uuid4())), "type": ("s", "802-11-wireless"), "interface-name": ("s", self.wlan_device.interface), }, "802-11-wireless": { "mode": ("s", "infrastructure"), "ssid": ("ay", ssid.encode("utf-8")), "security": ("s", "802-11-wireless-security"), }, "ipv4": {"method": ("s", "auto")}, "ipv6": {"method": ("s", "auto")}, } if security_type == "Open": properties["802-11-wireless"]["security"] = ("s", "none") elif "WPA-PSK" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "wpa-psk"), "psk": ("s", psk), } elif "SAE" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "sae"), "psk": ("s", psk), } elif "WPA3-B192" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "wpa-eap-suite-b-192"), "psk": ("s", psk), } elif "OWE" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "owe"), } elif "802.1x" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "wpa-eap"), "eap": ("as", [eap_method]), "identity": ("s", identity), "password": ("s", psk), } if phase2: properties["802-11-wireless-security"]["phase2_auth"] = ("s", phase2) elif "WEP" in security_type: properties["802-11-wireless-security"] = { "key-mgmt": ("s", "none"), "wep-key-type": ("u", 2), "wep-key0": ("s", psk), "auth-alg": ("s", "shared"), } else: return { "error": "unknown_security_type", "message": _("Unknown security type"), } try: NetworkManagerSettings().add_connection(properties) return {"status": "success"} except exceptions.NmSettingsPermissionDeniedError: logging.exception("Insufficient privileges") return { "error": "insufficient_privileges", "message": _("Insufficient privileges"), } except exceptions.NmConnectionInvalidPropertyError: logging.exception("Invalid property") return {"error": "psk_invalid", "message": _("Invalid password")} except Exception as e: logging.exception("Couldn't add network") return {"error": "unknown", "message": _("Couldn't add network") + f"\n{e}"} def disconnect_network(self): self.wlan_device.disconnect() def delete_network(self, ssid): if path := self.get_connection_path_by_ssid(ssid): self.delete_connection_path(path) else: logging.debug(f"SSID '{ssid}' not found among saved connections") def delete_connection_path(self, path): try: NetworkConnectionSettings(path).delete() logging.info(f"Deleted connection path: {path}") except Exception as e: logging.exception(f"Failed to delete connection path: {path} - {e}") return { "error": "deletion_failed", "message": _("Failed to delete connection") + f"\n{e}", } def rescan(self): try: return self.wlan_device.request_scan({}) except Exception as e: self.popup(f"Unexpected error: {e}") def get_connection_path_by_ssid(self, ssid): existing_networks = NetworkManagerSettings().list_connections() for connection_path in existing_networks: connection_settings = NetworkConnectionSettings( connection_path ).get_settings() if ( connection_settings.get("802-11-wireless") and connection_settings["802-11-wireless"].get("ssid") and connection_settings["802-11-wireless"]["ssid"][1].decode() == ssid ): return connection_path return None def connect(self, ssid): if target_connection := self.get_connection_path_by_ssid(ssid): try: active_connection = self.nm.activate_connection(target_connection) return target_connection except Exception as e: logging.exception("Unexpected error") self.popup(f"Unexpected error: {e}") else: self.popup(f"SSID '{ssid}' not found among saved connections") def toggle_wifi(self, enable): self.nm.wireless_enabled = enable def monitor_connection_status(self): state = self.wlan_device.state if self.wifi_state != state: logging.debug(f"State changed: {state} {self.wlan_device.state_reason}") if self.wifi_state == -1: logging.debug("Starting to monitor state") elif state in [ enums.DeviceState.PREPARE, enums.DeviceState.CONFIG, ]: self.popup(_("Connecting"), 1) elif state in [ enums.DeviceState.IP_CONFIG, enums.DeviceState.IP_CHECK, enums.DeviceState.SECONDARIES, ]: self.popup(_("Getting IP address"), 1) elif state in [ enums.DeviceState.ACTIVATED, ]: self.popup(_("Network connected"), 1) elif state in [ enums.DeviceState.DISCONNECTED, enums.DeviceState.DEACTIVATING, ]: self.popup(_("Network disconnected")) elif state == enums.DeviceState.FAILED: self.popup(_("Connection failed")) self.wifi_state = state return self.monitor_connection def enable_monitoring(self, enable): self.monitor_connection = enable