From a80b8810151e6051b8d6dd5b3f2baa3bc54e3a22 Mon Sep 17 00:00:00 2001 From: Alfredo Monclus Date: Fri, 24 May 2024 19:13:35 -0300 Subject: [PATCH] network: new NetworkManager backend using sdbus (#1269) * network: new NetworkManager backend using sdbus drop support for old methods * rescan changes * add wifi on-off --- docs/Troubleshooting/Network.md | 27 +- docs/Troubleshooting/wpa_supplicant.md | 27 - ks_includes/NetworkManager.py | 1252 ------------------------ ks_includes/screen_panel.py | 9 + ks_includes/sdbus_nm.py | 250 +++++ ks_includes/wifi.py | 347 ------- ks_includes/wifi_nm.py | 272 ----- panels/network.py | 467 ++++----- scripts/KlipperScreen-requirements.txt | 5 +- 9 files changed, 457 insertions(+), 2199 deletions(-) delete mode 100644 docs/Troubleshooting/wpa_supplicant.md delete mode 100644 ks_includes/NetworkManager.py create mode 100644 ks_includes/sdbus_nm.py delete mode 100644 ks_includes/wifi.py delete mode 100644 ks_includes/wifi_nm.py diff --git a/docs/Troubleshooting/Network.md b/docs/Troubleshooting/Network.md index 5e37d2fa..ebb87fa2 100644 --- a/docs/Troubleshooting/Network.md +++ b/docs/Troubleshooting/Network.md @@ -5,35 +5,10 @@ The network panel requires network-manager to function, (if you are using a fork this may not be the case) -Check if network-manager is installed: - -```bash -dpkg -s network-manager -``` - -if the response is the following: - -```sh -dpkg-query: the package 'network-manager' is not installed -``` - -if the response is the following: - -```sh -Package: network-manager -Status: install ok installed -``` - -this line may appear in KlipperScreen.log: -!!! abstract "Log" - ```sh - [wifi_nm.py:rescan()] [...] NetworkManager.wifi.scan request failed: not authorized - ``` - if version of KlipperScreen installed was previous than v0.3.9, then re-run the installer and reboot -??? Alternative workaround for network-manager +??? "Alternative workaround for network-manager not having permissions" in order to fix this polkit needs to be configured or disabled: diff --git a/docs/Troubleshooting/wpa_supplicant.md b/docs/Troubleshooting/wpa_supplicant.md deleted file mode 100644 index 59afbaff..00000000 --- a/docs/Troubleshooting/wpa_supplicant.md +++ /dev/null @@ -1,27 +0,0 @@ -## wpa_supplicant - -The user is not allowed to control the interface - -* Edit `/etc/wpa_supplicant/wpa_supplicant.conf` and add this line if it's not there: - -``` -ctrl_interface=DIR=/var/run/wpa_supplicant GROUP=netdev -``` - -* Run `cat /etc/group | grep netdev` - -If your username is not listed under that line, you need to add it with the following command: - -```sh -usermod -a -G netdev pi -``` -(if your username is not 'pi' change 'pi' to your username) - -Then reboot the machine: - -```sh -systemctl reboot -``` - -!!! tip - It's possible to just restart KlipperScreen and networking \ No newline at end of file diff --git a/ks_includes/NetworkManager.py b/ks_includes/NetworkManager.py deleted file mode 100644 index f7c68216..00000000 --- a/ks_includes/NetworkManager.py +++ /dev/null @@ -1,1252 +0,0 @@ -# python-networkmanager - Easy communication with NetworkManager -# Copyright (C) 2011-2021 Dennis Kaarsemaker -# -# This software is provided 'as-is', without any express or implied -# warranty. In no event will the authors be held liable for any damages -# arising from the use of this software. -# -# Permission is granted to anyone to use this software for any purpose, -# including commercial applications, and to alter it and redistribute it -# freely, subject to the following restrictions: -# -# 1. The origin of this software must not be misrepresented; you must not -# claim that you wrote the original software. If you use this software -# in a product, an acknowledgement in the product documentation would be -# appreciated but is not required. -# 2. Altered source versions must be plainly marked as such, and must not be -# misrepresented as being the original software. -# 3. This notice may not be removed or altered from any source distribution. -# NetworkManager - a library to make interacting with the NetworkManager daemon -# easier. -# -# (C)2011-2021 Dennis Kaarsemaker -# License: zlib - -import contextlib -import copy -import dbus -import dbus.service -import six -import socket -import struct -import time -import warnings -import xml.etree.ElementTree as ET - - -class ObjectVanished(Exception): - def __init__(self, obj): - self.obj = obj - super(ObjectVanished, self).__init__(obj.object_path) - - -class SignalDispatcher(object): - def __init__(self): - self.handlers = {} - self.args = {} - self.interfaces = set() - self.setup = False - - def setup_signals(self): - if not self.setup: - bus = dbus.SystemBus() - for interface in self.interfaces: - bus.add_signal_receiver(self.handle_signal, dbus_interface=interface, interface_keyword='interface', - member_keyword='signal', path_keyword='path') - self.setup = True - self.listen_for_restarts() - - def listen_for_restarts(self): - # If we have a mainloop, listen for disconnections - if not NMDbusInterface.last_disconnect and dbus.get_default_main_loop(): - dbus.SystemBus().add_signal_receiver(self.handle_restart, 'NameOwnerChanged', 'org.freedesktop.DBus') - NMDbusInterface.last_disconnect = 1 - - def add_signal_receiver(self, interface, signal, obj, func, args, kwargs): - self.setup_signals() - key = (interface, signal) - if key not in self.handlers: - self.handlers[key] = [] - self.handlers[key].append((obj, func, args, kwargs)) - - def handle_signal(self, *args, **kwargs): - key = (kwargs['interface'], kwargs['signal']) - skwargs = {} - sargs = [] - if key not in self.handlers: - return - try: - sender = fixups.base_to_python(kwargs['path']) - for arg, (name, signature) in zip(args, self.args[key]): - if name: - skwargs[name] = fixups.to_python(type(sender).__name__, kwargs['signal'], name, arg, signature) - else: - # Older NetworkManager versions don't supply attribute names. Hope for the best. - sargs.append(fixups.to_python(type(sender).__name__, kwargs['signal'], None, arg, signature)) - except dbus.exceptions.DBusException: - # This happens if the sender went away. Tough luck, no signal for you. - return - to_delete = [] - for pos, (match, receiver, rargs, rkwargs) in enumerate(self.handlers[key]): - try: - match == sender - except ObjectVanished: - to_delete.append(pos) - continue - if match == sender: - rkwargs['interface'] = kwargs['interface'] - rkwargs['signal'] = kwargs['signal'] - rkwargs.update(skwargs) - receiver(sender, *(sargs + rargs), **rkwargs) - for pos in reversed(to_delete): - self.handlers[key].pop(pos) - - def handle_restart(self, name, old, new): - if not str(new) or str(name) != 'org.freedesktop.NetworkManager': - return - NMDbusInterface.last_disconnect = time.time() - time.sleep(1) # Give NetworkManager a bit of time to start and rediscover itself. - for key in self.handlers: - val, self.handlers[key] = self.handlers[key], [] - for obj, func, args, kwargs in val: - with contextlib.suppress(ObjectVanished): - # This resets the object path if needed - obj.proxy - self.add_signal_receiver(key[0], key[1], obj, func, args, kwargs) - - -SignalDispatcher = SignalDispatcher() - -# We completely dynamically generate all classes using introspection data. As -# this is done at import time, use a special dbus connection that does not get -# in the way of setting a mainloop and doing async stuff later. -init_bus = dbus.SystemBus(private=True) -xml_cache = {} - - -class NMDbusInterfaceType(type): - """Metaclass that generates our classes based on introspection data""" - dbus_service = 'org.freedesktop.NetworkManager' - - def __new__(cls, name, bases, attrs): - attrs['dbus_service'] = cls.dbus_service - attrs['properties'] = [] - attrs['introspection_data'] = None - attrs['signals'] = [] - - # Derive the interface name from the name of the class, but let classes - # override it if needed - if 'interface_names' not in attrs and 'NMDbusInterface' not in name: - attrs['interface_names'] = [f'org.freedesktop.NetworkManager.{name}'] - for base in bases: - if hasattr(base, 'interface_names'): - attrs['interface_names'] = [ - f'{base.interface_names[0]}.{name}' - ] + base.interface_names - break - else: - for base in bases: - if hasattr(base, 'interface_names'): - attrs['interface_names'] += base.interface_names - break - - if 'interface_names' in attrs: - SignalDispatcher.interfaces.update(attrs['interface_names']) - - # If we know where to find this object, let's introspect it and - # generate properties and methods - if 'object_path' in attrs and attrs['object_path']: - proxy = init_bus.get_object(cls.dbus_service, attrs['object_path']) - attrs['introspection_data'] = proxy.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable') - root = ET.fromstring(attrs['introspection_data']) - for element in root: - if element.tag == 'interface' and element.attrib['name'] in attrs['interface_names']: - for item in element: - if item.tag == 'property': - attrs[item.attrib['name']] = cls.make_property( - name, element.attrib['name'], item.attrib - ) - attrs['properties'].append(item.attrib['name']) - elif item.tag == 'method': - aname = item.attrib['name'] - if aname in attrs: - aname = f'_{aname}' - attrs[aname] = cls.make_method( - name, - element.attrib['name'], - item.attrib, - list(item), - ) - elif item.tag == 'signal': - SignalDispatcher.args[(element.attrib['name'], item.attrib['name'])] = [ - (arg.attrib.get('name', None), arg.attrib['type']) for arg in item] - attrs['On' + item.attrib['name']] = cls.make_signal( - name, element.attrib['name'], item.attrib - ) - attrs['signals'].append(item.attrib['name']) - - return super(NMDbusInterfaceType, cls).__new__(cls, name, bases, attrs) - - @staticmethod - def make_property(cls, interface, attrib): - name = attrib['name'] - - def get_func(self): - try: - data = self.proxy.Get(interface, name, dbus_interface='org.freedesktop.DBus.Properties') - except dbus.exceptions.DBusException as e: - if e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownMethod': - raise ObjectVanished(self) - raise - return fixups.to_python(cls, 'Get', name, data, attrib['type']) - - if attrib['access'] == 'read': - return property(get_func) - - def set_func(self, value): - value = fixups.to_dbus(cls, 'Set', name, value, attrib['type']) - try: - return self.proxy.Set(interface, name, value, dbus_interface='org.freedesktop.DBus.Properties') - except dbus.exceptions.DBusException as e: - if e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownMethod': - raise ObjectVanished(self) - raise - - return property(get_func, set_func) - - @staticmethod - def make_method(cls, interface, attrib, args): - name = attrib['name'] - outargs = [x for x in args if x.tag == 'arg' and x.attrib['direction'] == 'out'] - outargstr = ', '.join([x.attrib['name'] for x in outargs]) or 'ret' - args = [x for x in args if x.tag == 'arg' and x.attrib['direction'] == 'in'] - argstr = ', '.join([x.attrib['name'] for x in args]) - ret = {} - code = "def %s(self%s):\n" % (name, f', {argstr}' if argstr else '') - for arg in args: - argname = arg.attrib['name'] - signature = arg.attrib['type'] - code += " %s = fixups.to_dbus('%s', '%s', '%s', %s, '%s')\n" % ( - argname, cls, name, argname, argname, signature) - code += " try:\n" - code += " %s = dbus.Interface(self.proxy, '%s').%s(%s)\n" % (outargstr, interface, name, argstr) - code += " except dbus.exceptions.DBusException as e:\n" - code += " if e.get_dbus_name() == 'org.freedesktop.DBus.Error.UnknownMethod':\n" - code += " raise ObjectVanished(self)\n" - code += " raise\n" - for arg in outargs: - argname = arg.attrib['name'] - signature = arg.attrib['type'] - code += " %s = fixups.to_python('%s', '%s', '%s', %s, '%s')\n" % ( - argname, cls, name, argname, argname, signature) - code += f" return ({outargstr})" - exec(code, globals(), ret) - return ret[name] - - @staticmethod - def make_signal(cls, interface, attrib): - name = attrib['name'] - ret = {} - code = f"def On{name}(self, func, *args, **kwargs):" - code += f" SignalDispatcher.add_signal_receiver('{interface}', '{name}', self, func, list(args), kwargs)" - exec(code, globals(), ret) - return ret[f'On{name}'] - - -@six.add_metaclass(NMDbusInterfaceType) -class NMDbusInterface(object): - object_path = None - last_disconnect = 0 - is_transient = False - - def __new__(cls, object_path=None): - # If we didn't introspect this one at definition time, let's do it now. - if object_path and not cls.introspection_data: - proxy = dbus.SystemBus().get_object(cls.dbus_service, object_path) - cls.introspection_data = proxy.Introspect(dbus_interface='org.freedesktop.DBus.Introspectable') - root = ET.fromstring(cls.introspection_data) - for element in root: - if element.tag == 'interface' and element.attrib['name'] in cls.interface_names: - for item in element: - if item.tag == 'property': - setattr(cls, item.attrib['name'], - type(cls).make_property(cls.__name__, element.attrib['name'], item.attrib)) - cls.properties.append(item.attrib['name']) - elif item.tag == 'method': - aname = item.attrib['name'] - if hasattr(cls, aname): - aname = f'_{aname}' - setattr(cls, aname, - type(cls).make_method(cls.__name__, element.attrib['name'], item.attrib, - list(item))) - elif item.tag == 'signal': - SignalDispatcher.args[(element.attrib['name'], item.attrib['name'])] = [ - (arg.attrib.get('name', None), arg.attrib['type']) for arg in item] - setattr(cls, 'On' + item.attrib['name'], - type(cls).make_signal(cls.__name__, element.attrib['name'], item.attrib)) - cls.signals.append(item.attrib['name']) - - SignalDispatcher.listen_for_restarts() - return super(NMDbusInterface, cls).__new__(cls) - - def __init__(self, object_path=None): - if isinstance(object_path, NMDbusInterface): - object_path = object_path.object_path - self.object_path = self.object_path or object_path - self._proxy = None - - def __eq__(self, other): - return isinstance(other, NMDbusInterface) and self.object_path and other.object_path == self.object_path - - @property - def proxy(self): - if not self._proxy: - self._proxy = dbus.SystemBus().get_object(self.dbus_service, self.object_path, - follow_name_owner_changes=True) - self._proxy.created = time.time() - elif self._proxy.created < self.last_disconnect: - if self.is_transient: - raise ObjectVanished(self) - obj = type(self)(self.object_path) - if obj.object_path != self.object_path: - self.object_path = obj.object_path - self._proxy = dbus.SystemBus().get_object(self.dbus_service, self.object_path) - self._proxy.created = time.time() - return self._proxy - - # Backwards compatibility interface - def connect_to_signal(self, signal, handler, *args, **kwargs): - return getattr(self, f'On{signal}')(handler, *args, **kwargs) - - -class TransientNMDbusInterface(NMDbusInterface): - is_transient = True - - -class NetworkManager(NMDbusInterface): - interface_names = ['org.freedesktop.NetworkManager'] - object_path = '/org/freedesktop/NetworkManager' - - # noop method for backward compatibility. It is no longer necessary to call - # this but let's not break code that does so. - def auto_reconnect(self): - pass - - -class Statistics(NMDbusInterface): - object_path = '/org/freedesktop/NetworkManager/Statistics' - - -class Settings(NMDbusInterface): - object_path = '/org/freedesktop/NetworkManager/Settings' - - -class AgentManager(NMDbusInterface): - object_path = '/org/freedesktop/NetworkManager/AgentManager' - - -class Connection(NMDbusInterface): - interface_names = ['org.freedesktop.NetworkManager.Settings.Connection'] - has_secrets = ['802-1x', '802-11-wireless-security', 'cdma', 'gsm', 'pppoe', 'vpn'] - - def __init__(self, object_path): - super(Connection, self).__init__(object_path) - self.uuid = self.GetSettings()['connection']['uuid'] - - def GetSecrets(self, name=None): - settings = self.GetSettings() - if name is None: - name = settings['connection']['type'] - name = settings[name].get('security', name) - try: - return self._GetSecrets(name) - except dbus.exceptions.DBusException as e: - if e.get_dbus_name() != 'org.freedesktop.NetworkManager.AgentManager.NoSecrets': - raise - return {key: {} for key in settings} - - @staticmethod - def all(): - return Settings.ListConnections() - - def __eq__(self, other): - return isinstance(other, type(self)) and self.uuid == other.uuid - - -class ActiveConnection(TransientNMDbusInterface): - interface_names = ['org.freedesktop.NetworkManager.Connection.Active'] - - def __new__(cls, object_path): - if cls == ActiveConnection: - # Automatically turn this into a VPNConnection if needed - obj = dbus.SystemBus().get_object(cls.dbus_service, object_path) - if obj.Get('org.freedesktop.NetworkManager.Connection.Active', 'Vpn', - dbus_interface='org.freedesktop.DBus.Properties'): - return VPNConnection.__new__(VPNConnection, object_path) - return super(ActiveConnection, cls).__new__(cls, object_path) - - def __eq__(self, other): - return isinstance(other, type(self)) and self.Uuid == other.Uuid - - -class VPNConnection(ActiveConnection): - interface_names = ['org.freedesktop.NetworkManager.VPN.Connection'] - - -class Device(NMDbusInterface): - interface_names = ['org.freedesktop.NetworkManager.Device', 'org.freedesktop.NetworkManager.Device.Statistics'] - - def __new__(cls, object_path): - if cls == Device: - # Automatically specialize the device - with contextlib.suppress(ObjectVanished): - obj = dbus.SystemBus().get_object(cls.dbus_service, object_path) - cls = device_class(obj.Get('org.freedesktop.NetworkManager.Device', 'DeviceType', - dbus_interface='org.freedesktop.DBus.Properties')) - return cls.__new__(cls, object_path) - return super(Device, cls).__new__(cls, object_path) - - @staticmethod - def all(): - return NetworkManager.Devices - - def __eq__(self, other): - return isinstance(other, type(self)) and self.IpInterface == other.IpInterface - - # Backwards compatibility method. Devices now auto-specialize, so this is - # no longer needed. But code may use it. - def SpecificDevice(self): - return self - - -def device_class(typ): - return { - NM_DEVICE_TYPE_ADSL: Adsl, - NM_DEVICE_TYPE_BOND: Bond, - NM_DEVICE_TYPE_BRIDGE: Bridge, - NM_DEVICE_TYPE_BT: Bluetooth, - NM_DEVICE_TYPE_ETHERNET: Wired, - NM_DEVICE_TYPE_GENERIC: Generic, - NM_DEVICE_TYPE_INFINIBAND: Infiniband, - NM_DEVICE_TYPE_IP_TUNNEL: IPTunnel, - NM_DEVICE_TYPE_MACVLAN: Macvlan, - NM_DEVICE_TYPE_MODEM: Modem, - NM_DEVICE_TYPE_OLPC_MESH: OlpcMesh, - NM_DEVICE_TYPE_TEAM: Team, - NM_DEVICE_TYPE_TUN: Tun, - NM_DEVICE_TYPE_VETH: Veth, - NM_DEVICE_TYPE_VLAN: Vlan, - NM_DEVICE_TYPE_VXLAN: Vxlan, - NM_DEVICE_TYPE_WIFI: Wireless, - NM_DEVICE_TYPE_WIMAX: Wimax, - NM_DEVICE_TYPE_MACSEC: MacSec, - NM_DEVICE_TYPE_DUMMY: Dummy, - NM_DEVICE_TYPE_PPP: PPP, - NM_DEVICE_TYPE_OVS_INTERFACE: OvsIf, - NM_DEVICE_TYPE_OVS_PORT: OvsPort, - NM_DEVICE_TYPE_OVS_BRIDGE: OvsBridge, - NM_DEVICE_TYPE_WPAN: Wpan, - NM_DEVICE_TYPE_6LOWPAN: SixLoWpan, - NM_DEVICE_TYPE_WIREGUARD: WireGuard, - NM_DEVICE_TYPE_VRF: Vrf, - NM_DEVICE_TYPE_WIFI_P2P: WifiP2p, - }[typ] - - -class Adsl(Device): - pass - - -class Bluetooth(Device): - pass - - -class Bond(Device): - pass - - -class Bridge(Device): - pass - - -class Generic(Device): - pass - - -class Infiniband(Device): - pass - - -class IPTunnel(Device): - pass - - -class Macvlan(Device): - pass - - -class Modem(Device): - pass - - -class OlpcMesh(Device): - pass - - -class Team(Device): - pass - - -class Tun(Device): - pass - - -class Veth(Device): - pass - - -class Vlan(Device): - pass - - -class Vxlan(Device): - pass - - -class Wimax(Device): - pass - - -class Wired(Device): - pass - - -class Wireless(Device): - pass - - -class MacSec(Device): - pass - - -class Dummy(Device): - pass - - -class PPP(Device): - pass - - -class OvsIf(Device): - pass - - -class OvsPort(Device): - pass - - -class OvsBridge(Device): - pass - - -class Wpan(Device): - pass - - -class SixLoWpan(Device): - pass - - -class WireGuard(Device): - pass - - -class WifiP2p(Device): - pass - - -class Vrf(Device): - pass - - -class NSP(TransientNMDbusInterface): - interface_names = ['org.freedesktop.NetworkManager.Wimax.NSP'] - - -class AccessPoint(NMDbusInterface): - @staticmethod - def all(): - for device in Device.all(): - if isinstance(device, Wireless): - yield from device.AccessPoints - - def __eq__(self, other): - return isinstance(other, type(self)) and self.HwAddress == other.HwAddress - - -class IP4Config(TransientNMDbusInterface): - pass - - -class IP6Config(TransientNMDbusInterface): - pass - - -class DHCP4Config(TransientNMDbusInterface): - pass - - -class DHCP6Config(TransientNMDbusInterface): - pass - - -# Evil hack to work around not being able to specify a method name in the -# dbus.service.method decorator. -class SecretAgentType(type(dbus.service.Object)): - def __new__(cls, name, bases, attrs): - if bases != (dbus.service.Object,): - attrs['GetSecretsImpl'] = attrs.pop('GetSecrets') - return super(SecretAgentType, cls).__new__(cls, name, bases, attrs) - - -@six.add_metaclass(SecretAgentType) -class SecretAgent(dbus.service.Object): - object_path = '/org/freedesktop/NetworkManager/SecretAgent' - interface_name = 'org.freedesktop.NetworkManager.SecretAgent' - - def __init__(self, identifier): - self.identifier = identifier - dbus.service.Object.__init__(self, dbus.SystemBus(), self.object_path) - AgentManager.Register(self.identifier) - - @dbus.service.method(dbus_interface=interface_name, in_signature='a{sa{sv}}osasu', out_signature='a{sa{sv}}') - def GetSecrets(self, connection, connection_path, setting_name, hints, flags): - settings = fixups.to_python('SecretAgent', 'GetSecrets', 'connection', connection, 'a{sa{sv}}') - connection = fixups.to_python('SecretAgent', 'GetSecrets', 'connection_path', connection_path, 'o') - setting_name = fixups.to_python('SecretAgent', 'GetSecrets', 'setting_name', setting_name, 's') - hints = fixups.to_python('SecretAgent', 'GetSecrets', 'hints', hints, 'as') - return self.GetSecretsImpl(settings, connection, setting_name, hints, flags) - - -# These two are interfaces that must be provided to NetworkManager. Keep them -# as comments for documentation purposes. -# -# class PPP(NMDbusInterface): pass -# class VPNPlugin(NMDbusInterface): -# interface_names = ['org.freedesktop.NetworkManager.VPN.Plugin'] - -def const(prefix, val): - prefix = f'NM_{prefix.upper()}_' - for key, vval in globals().items(): - if 'REASON' in key and 'REASON' not in prefix: - continue - if key.startswith(prefix) and val == vval: - return key.replace(prefix, '').lower() - raise ValueError("No constant found for %s* with value %d", (prefix, val)) - - -# Several fixer methods to make the data easier to handle in python -# - SSID sent/returned as bytes (only encoding tried is utf-8) -# - IP, Mac address and route metric encoding/decoding -class fixups(object): - @staticmethod - def to_dbus(cls, method, arg, val, signature): - if arg in 'connection' 'properties' and signature == 'a{sa{sv}}': - settings = copy.deepcopy(val) - for key in settings: - if 'mac-address' in settings[key]: - settings[key]['mac-address'] = fixups.mac_to_dbus(settings[key]['mac-address']) - if 'cloned-mac-address' in settings[key]: - settings[key]['cloned-mac-address'] = fixups.mac_to_dbus(settings[key]['cloned-mac-address']) - if 'bssid' in settings[key]: - settings[key]['bssid'] = fixups.mac_to_dbus(settings[key]['bssid']) - for cert in ['ca-cert', 'client-cert', 'phase2-ca-cert', 'phase2-client-cert', 'private-key']: - if cert in settings[key]: - settings[key][cert] = fixups.cert_to_dbus(settings[key][cert]) - if 'routing-rules' in settings[key]: - for rule in settings[key]['routing-rules']: - for p in rule: - rule[p] = dbus.Int32(rule[p]) if p == 'family' else dbus.UInt32(rule[p]) - settings[key]['routing-rules'] = dbus.Array( - settings[key]['routing-rules'], signature=dbus.Signature('a{sv}')) - if 'ssid' in settings.get('802-11-wireless', {}): - settings['802-11-wireless']['ssid'] = fixups.ssid_to_dbus(settings['802-11-wireless']['ssid']) - with contextlib.suppress(KeyError): - val['ipv4']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET) for addr in - val['ipv4']['addresses']] - val['ipv4']['routes'] = [fixups.route_to_python(route, socket.AF_INET) for route in - val['ipv4']['routes']] - val['ipv4']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET) for addr in val['ipv4']['dns']] - val['ipv6']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET6) for addr in - val['ipv6']['addresses']] - val['ipv6']['routes'] = [fixups.route_to_python(route, socket.AF_INET6) for route in - val['ipv6']['routes']] - val['ipv6']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['dns']] - # Get rid of empty arrays/dicts. dbus barfs on them (can't guess - # signatures), and if they were to get through, NetworkManager - # ignores them anyway. - for key in list(settings.keys()): - if isinstance(settings[key], dict): - for key2 in list(settings[key].keys()): - if settings[key][key2] in ({}, []): - del settings[key][key2] - if settings[key] in ({}, []): - del settings[key] - val = settings - return fixups.base_to_dbus(val) - - @staticmethod - def base_to_dbus(val): - if isinstance(val, NMDbusInterface): - return val.object_path - if hasattr(val.__class__, 'mro'): - for cls in val.__class__.mro(): - if cls.__module__ in ('dbus', '_dbus_bindings'): - return val - if hasattr(val, '__iter__') and not isinstance(val, six.string_types): - if hasattr(val, 'items'): - return dict([(x, fixups.base_to_dbus(y)) for x, y in val.items()]) - else: - return [fixups.base_to_dbus(x) for x in val] - return val - - @staticmethod - def to_python(cls, method, arg, val, signature): - val = fixups.base_to_python(val) - cls_af = {'IP4Config': socket.AF_INET, 'IP6Config': socket.AF_INET6}.get(cls, socket.AF_INET) - if method == 'Get': - if arg == 'Ip4Address': - return fixups.addr_to_python(val, socket.AF_INET) - if arg == 'Ip6Address': - return fixups.addr_to_python(val, socket.AF_INET6) - if arg == 'Ssid': - return fixups.ssid_to_python(val) - if arg == 'Strength': - return fixups.strength_to_python(val) - if arg == 'Addresses': - return [fixups.addrconf_to_python(addr, cls_af) for addr in val] - if arg == 'Routes': - return [fixups.route_to_python(route, cls_af) for route in val] - if arg in ('Nameservers', 'WinsServers'): - return [fixups.addr_to_python(addr, cls_af) for addr in val] - if arg == 'Options': - for key in val: - if key.startswith('requested_'): - val[key] = bool(int(val[key])) - elif val[key].isdigit(): - val[key] = int(val[key]) - elif key in ('domain_name_servers', 'ntp_servers', 'routers'): - val[key] = val[key].split() - - return val - if method == 'GetSettings': - if 'ssid' in val.get('802-11-wireless', {}): - val['802-11-wireless']['ssid'] = fixups.ssid_to_python(val['802-11-wireless']['ssid']) - for key in val: - val_ = val[key] - if 'mac-address' in val_: - val_['mac-address'] = fixups.mac_to_python(val_['mac-address']) - if 'cloned-mac-address' in val_: - val_['cloned-mac-address'] = fixups.mac_to_python(val_['cloned-mac-address']) - if 'bssid' in val_: - val_['bssid'] = fixups.mac_to_python(val_['bssid']) - if 'ipv4' in val: - if 'addresses' in val['ipv4']: - val['ipv4']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET) for addr in - val['ipv4']['addresses']] - if 'routes' in val['ipv4']: - val['ipv4']['routes'] = [fixups.route_to_python(route, socket.AF_INET) for route in - val['ipv4']['routes']] - if 'dns' in val['ipv4']: - val['ipv4']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET) for addr in val['ipv4']['dns']] - if 'ipv6' in val: - if 'addresses' in val['ipv6']: - val['ipv6']['addresses'] = [fixups.addrconf_to_python(addr, socket.AF_INET6) for addr in - val['ipv6']['addresses']] - if 'routes' in val['ipv6']: - val['ipv6']['routes'] = [fixups.route_to_python(route, socket.AF_INET6) for route in - val['ipv6']['routes']] - if 'dns' in val['ipv6']: - val['ipv6']['dns'] = [fixups.addr_to_python(addr, socket.AF_INET6) for addr in val['ipv6']['dns']] - return val - if method == 'PropertiesChanged': - for prop in val: - val[prop] = fixups.to_python(cls, 'Get', prop, val[prop], None) - return val - - @staticmethod - def base_to_python(val): - if isinstance(val, dbus.ByteArray): - return "".join([str(x) for x in val]) - if isinstance(val, (dbus.Array, list, tuple)): - return [fixups.base_to_python(x) for x in val] - if isinstance(val, (dbus.Dictionary, dict)): - return dict([(fixups.base_to_python(x), fixups.base_to_python(y)) for x, y in val.items()]) - if isinstance(val, dbus.ObjectPath): - for obj in (NetworkManager, Settings, AgentManager): - if val == obj.object_path: - return obj - if val.startswith('/org/freedesktop/NetworkManager/'): - classname = val.split('/')[4] - classname = { - 'Settings': 'Connection', - 'Devices': 'Device', - }.get(classname, classname) - return globals()[classname](val) - if val == '/': - return None - if isinstance(val, (dbus.Signature, dbus.String)): - return six.text_type(val) - if isinstance(val, dbus.Boolean): - return bool(val) - if isinstance(val, (dbus.Int16, dbus.UInt16, dbus.Int32, dbus.UInt32, dbus.Int64, dbus.UInt64)): - return int(val) - return six.int2byte(int(val)) if isinstance(val, dbus.Byte) else val - - @staticmethod - def ssid_to_python(ssid): - try: - return bytes().join(ssid).decode('utf-8') - except UnicodeDecodeError: - ssid = bytes().join(ssid).decode('utf-8', 'replace') - warnings.warn(f"Unable to decode ssid {ssid} properly", UnicodeWarning) - return ssid - - @staticmethod - def ssid_to_dbus(ssid): - if isinstance(ssid, six.text_type): - ssid = ssid.encode('utf-8') - return [dbus.Byte(x) for x in ssid] - - @staticmethod - def strength_to_python(strength): - return struct.unpack('B', strength)[0] - - @staticmethod - def mac_to_python(mac): - return "%02X:%02X:%02X:%02X:%02X:%02X" % tuple(ord(x) for x in mac) - - @staticmethod - def mac_to_dbus(mac): - return [dbus.Byte(int(x, 16)) for x in mac.split(':')] - - @staticmethod - def addrconf_to_python(addrconf, family): - addr, netmask, gateway = addrconf - return [ - fixups.addr_to_python(addr, family), - netmask, - fixups.addr_to_python(gateway, family) - ] - - @staticmethod - def addrconf_to_dbus(addrconf, family): - addr, netmask, gateway = addrconf - if family == socket.AF_INET: - return [ - fixups.addr_to_dbus(addr, family), - fixups.mask_to_dbus(netmask), - fixups.addr_to_dbus(gateway, family) - ] - else: - return dbus.Struct( - ( - fixups.addr_to_dbus(addr, family), - fixups.mask_to_dbus(netmask), - fixups.addr_to_dbus(gateway, family) - ), signature='ayuay' - ) - - @staticmethod - def addr_to_python(addr, family): - if family == socket.AF_INET: - return socket.inet_ntop(family, struct.pack('I', addr)) - else: - return socket.inet_ntop(family, b''.join(addr)) - - @staticmethod - def addr_to_dbus(addr, family): - if family == socket.AF_INET: - return dbus.UInt32(struct.unpack('I', socket.inet_pton(family, addr))[0]) - else: - return dbus.ByteArray(socket.inet_pton(family, addr)) - - @staticmethod - def mask_to_dbus(mask): - return dbus.UInt32(mask) - - @staticmethod - def route_to_python(route, family): - addr, netmask, gateway, metric = route - return [ - fixups.addr_to_python(addr, family), - netmask, - fixups.addr_to_python(gateway, family), - metric - ] - - @staticmethod - def route_to_dbus(route, family): - addr, netmask, gateway, metric = route - return [ - fixups.addr_to_dbus(addr, family), - fixups.mask_to_dbus(netmask), - fixups.addr_to_dbus(gateway, family), - metric - ] - - @staticmethod - def cert_to_dbus(cert): - if not isinstance(cert, bytes): - if not cert.startswith('file://'): - cert = f'file://{cert}' - cert = cert.encode('utf-8') + b'\0' - return [dbus.Byte(x) for x in cert] - - -# Turn NetworkManager and Settings into singleton objects -NetworkManager = NetworkManager() -Settings = Settings() -AgentManager = AgentManager() -init_bus.close() -del init_bus -del xml_cache - -# Constants below are generated with makeconstants.py. Do not edit manually. -NM_CAPABILITY_TEAM = 1 -NM_CAPABILITY_OVS = 2 -NM_STATE_UNKNOWN = 0 -NM_STATE_ASLEEP = 10 -NM_STATE_DISCONNECTED = 20 -NM_STATE_DISCONNECTING = 30 -NM_STATE_CONNECTING = 40 -NM_STATE_CONNECTED_LOCAL = 50 -NM_STATE_CONNECTED_SITE = 60 -NM_STATE_CONNECTED_GLOBAL = 70 -NM_CONNECTIVITY_UNKNOWN = 0 -NM_CONNECTIVITY_NONE = 1 -NM_CONNECTIVITY_PORTAL = 2 -NM_CONNECTIVITY_LIMITED = 3 -NM_CONNECTIVITY_FULL = 4 -NM_DEVICE_TYPE_UNKNOWN = 0 -NM_DEVICE_TYPE_ETHERNET = 1 -NM_DEVICE_TYPE_WIFI = 2 -NM_DEVICE_TYPE_UNUSED1 = 3 -NM_DEVICE_TYPE_UNUSED2 = 4 -NM_DEVICE_TYPE_BT = 5 -NM_DEVICE_TYPE_OLPC_MESH = 6 -NM_DEVICE_TYPE_WIMAX = 7 -NM_DEVICE_TYPE_MODEM = 8 -NM_DEVICE_TYPE_INFINIBAND = 9 -NM_DEVICE_TYPE_BOND = 10 -NM_DEVICE_TYPE_VLAN = 11 -NM_DEVICE_TYPE_ADSL = 12 -NM_DEVICE_TYPE_BRIDGE = 13 -NM_DEVICE_TYPE_GENERIC = 14 -NM_DEVICE_TYPE_TEAM = 15 -NM_DEVICE_TYPE_TUN = 16 -NM_DEVICE_TYPE_IP_TUNNEL = 17 -NM_DEVICE_TYPE_MACVLAN = 18 -NM_DEVICE_TYPE_VXLAN = 19 -NM_DEVICE_TYPE_VETH = 20 -NM_DEVICE_TYPE_MACSEC = 21 -NM_DEVICE_TYPE_DUMMY = 22 -NM_DEVICE_TYPE_PPP = 23 -NM_DEVICE_TYPE_OVS_INTERFACE = 24 -NM_DEVICE_TYPE_OVS_PORT = 25 -NM_DEVICE_TYPE_OVS_BRIDGE = 26 -NM_DEVICE_TYPE_WPAN = 27 -NM_DEVICE_TYPE_6LOWPAN = 28 -NM_DEVICE_TYPE_WIREGUARD = 29 -NM_DEVICE_TYPE_WIFI_P2P = 30 -NM_DEVICE_TYPE_VRF = 31 -NM_DEVICE_CAP_NONE = 0 -NM_DEVICE_CAP_NM_SUPPORTED = 1 -NM_DEVICE_CAP_CARRIER_DETECT = 2 -NM_DEVICE_CAP_IS_SOFTWARE = 4 -NM_DEVICE_CAP_SRIOV = 8 -NM_WIFI_DEVICE_CAP_NONE = 0 -NM_WIFI_DEVICE_CAP_CIPHER_WEP40 = 1 -NM_WIFI_DEVICE_CAP_CIPHER_WEP104 = 2 -NM_WIFI_DEVICE_CAP_CIPHER_TKIP = 4 -NM_WIFI_DEVICE_CAP_CIPHER_CCMP = 8 -NM_WIFI_DEVICE_CAP_WPA = 16 -NM_WIFI_DEVICE_CAP_RSN = 32 -NM_WIFI_DEVICE_CAP_AP = 64 -NM_WIFI_DEVICE_CAP_ADHOC = 128 -NM_WIFI_DEVICE_CAP_FREQ_VALID = 256 -NM_WIFI_DEVICE_CAP_FREQ_2GHZ = 512 -NM_WIFI_DEVICE_CAP_FREQ_5GHZ = 1024 -NM_WIFI_DEVICE_CAP_MESH = 4096 -NM_WIFI_DEVICE_CAP_IBSS_RSN = 8192 -NM_802_11_AP_FLAGS_NONE = 0 -NM_802_11_AP_FLAGS_PRIVACY = 1 -NM_802_11_AP_FLAGS_WPS = 2 -NM_802_11_AP_FLAGS_WPS_PBC = 4 -NM_802_11_AP_FLAGS_WPS_PIN = 8 -NM_802_11_AP_SEC_NONE = 0 -NM_802_11_AP_SEC_PAIR_WEP40 = 1 -NM_802_11_AP_SEC_PAIR_WEP104 = 2 -NM_802_11_AP_SEC_PAIR_TKIP = 4 -NM_802_11_AP_SEC_PAIR_CCMP = 8 -NM_802_11_AP_SEC_GROUP_WEP40 = 16 -NM_802_11_AP_SEC_GROUP_WEP104 = 32 -NM_802_11_AP_SEC_GROUP_TKIP = 64 -NM_802_11_AP_SEC_GROUP_CCMP = 128 -NM_802_11_AP_SEC_KEY_MGMT_PSK = 256 -NM_802_11_AP_SEC_KEY_MGMT_802_1X = 512 -NM_802_11_AP_SEC_KEY_MGMT_SAE = 1024 -NM_802_11_AP_SEC_KEY_MGMT_OWE = 2048 -NM_802_11_AP_SEC_KEY_MGMT_OWE_TM = 4096 -NM_802_11_MODE_UNKNOWN = 0 -NM_802_11_MODE_ADHOC = 1 -NM_802_11_MODE_INFRA = 2 -NM_802_11_MODE_AP = 3 -NM_802_11_MODE_MESH = 4 -NM_BT_CAPABILITY_NONE = 0 -NM_BT_CAPABILITY_DUN = 1 -NM_BT_CAPABILITY_NAP = 2 -NM_DEVICE_MODEM_CAPABILITY_NONE = 0 -NM_DEVICE_MODEM_CAPABILITY_POTS = 1 -NM_DEVICE_MODEM_CAPABILITY_CDMA_EVDO = 2 -NM_DEVICE_MODEM_CAPABILITY_GSM_UMTS = 4 -NM_DEVICE_MODEM_CAPABILITY_LTE = 8 -NM_WIMAX_NSP_NETWORK_TYPE_UNKNOWN = 0 -NM_WIMAX_NSP_NETWORK_TYPE_HOME = 1 -NM_WIMAX_NSP_NETWORK_TYPE_PARTNER = 2 -NM_WIMAX_NSP_NETWORK_TYPE_ROAMING_PARTNER = 3 -NM_DEVICE_STATE_UNKNOWN = 0 -NM_DEVICE_STATE_UNMANAGED = 10 -NM_DEVICE_STATE_UNAVAILABLE = 20 -NM_DEVICE_STATE_DISCONNECTED = 30 -NM_DEVICE_STATE_PREPARE = 40 -NM_DEVICE_STATE_CONFIG = 50 -NM_DEVICE_STATE_NEED_AUTH = 60 -NM_DEVICE_STATE_IP_CONFIG = 70 -NM_DEVICE_STATE_IP_CHECK = 80 -NM_DEVICE_STATE_SECONDARIES = 90 -NM_DEVICE_STATE_ACTIVATED = 100 -NM_DEVICE_STATE_DEACTIVATING = 110 -NM_DEVICE_STATE_FAILED = 120 -NM_DEVICE_STATE_REASON_NONE = 0 -NM_DEVICE_STATE_REASON_UNKNOWN = 1 -NM_DEVICE_STATE_REASON_NOW_MANAGED = 2 -NM_DEVICE_STATE_REASON_NOW_UNMANAGED = 3 -NM_DEVICE_STATE_REASON_CONFIG_FAILED = 4 -NM_DEVICE_STATE_REASON_IP_CONFIG_UNAVAILABLE = 5 -NM_DEVICE_STATE_REASON_IP_CONFIG_EXPIRED = 6 -NM_DEVICE_STATE_REASON_NO_SECRETS = 7 -NM_DEVICE_STATE_REASON_SUPPLICANT_DISCONNECT = 8 -NM_DEVICE_STATE_REASON_SUPPLICANT_CONFIG_FAILED = 9 -NM_DEVICE_STATE_REASON_SUPPLICANT_FAILED = 10 -NM_DEVICE_STATE_REASON_SUPPLICANT_TIMEOUT = 11 -NM_DEVICE_STATE_REASON_PPP_START_FAILED = 12 -NM_DEVICE_STATE_REASON_PPP_DISCONNECT = 13 -NM_DEVICE_STATE_REASON_PPP_FAILED = 14 -NM_DEVICE_STATE_REASON_DHCP_START_FAILED = 15 -NM_DEVICE_STATE_REASON_DHCP_ERROR = 16 -NM_DEVICE_STATE_REASON_DHCP_FAILED = 17 -NM_DEVICE_STATE_REASON_SHARED_START_FAILED = 18 -NM_DEVICE_STATE_REASON_SHARED_FAILED = 19 -NM_DEVICE_STATE_REASON_AUTOIP_START_FAILED = 20 -NM_DEVICE_STATE_REASON_AUTOIP_ERROR = 21 -NM_DEVICE_STATE_REASON_AUTOIP_FAILED = 22 -NM_DEVICE_STATE_REASON_MODEM_BUSY = 23 -NM_DEVICE_STATE_REASON_MODEM_NO_DIAL_TONE = 24 -NM_DEVICE_STATE_REASON_MODEM_NO_CARRIER = 25 -NM_DEVICE_STATE_REASON_MODEM_DIAL_TIMEOUT = 26 -NM_DEVICE_STATE_REASON_MODEM_DIAL_FAILED = 27 -NM_DEVICE_STATE_REASON_MODEM_INIT_FAILED = 28 -NM_DEVICE_STATE_REASON_GSM_APN_FAILED = 29 -NM_DEVICE_STATE_REASON_GSM_REGISTRATION_NOT_SEARCHING = 30 -NM_DEVICE_STATE_REASON_GSM_REGISTRATION_DENIED = 31 -NM_DEVICE_STATE_REASON_GSM_REGISTRATION_TIMEOUT = 32 -NM_DEVICE_STATE_REASON_GSM_REGISTRATION_FAILED = 33 -NM_DEVICE_STATE_REASON_GSM_PIN_CHECK_FAILED = 34 -NM_DEVICE_STATE_REASON_FIRMWARE_MISSING = 35 -NM_DEVICE_STATE_REASON_REMOVED = 36 -NM_DEVICE_STATE_REASON_SLEEPING = 37 -NM_DEVICE_STATE_REASON_CONNECTION_REMOVED = 38 -NM_DEVICE_STATE_REASON_USER_REQUESTED = 39 -NM_DEVICE_STATE_REASON_CARRIER = 40 -NM_DEVICE_STATE_REASON_CONNECTION_ASSUMED = 41 -NM_DEVICE_STATE_REASON_SUPPLICANT_AVAILABLE = 42 -NM_DEVICE_STATE_REASON_MODEM_NOT_FOUND = 43 -NM_DEVICE_STATE_REASON_BT_FAILED = 44 -NM_DEVICE_STATE_REASON_GSM_SIM_NOT_INSERTED = 45 -NM_DEVICE_STATE_REASON_GSM_SIM_PIN_REQUIRED = 46 -NM_DEVICE_STATE_REASON_GSM_SIM_PUK_REQUIRED = 47 -NM_DEVICE_STATE_REASON_GSM_SIM_WRONG = 48 -NM_DEVICE_STATE_REASON_INFINIBAND_MODE = 49 -NM_DEVICE_STATE_REASON_DEPENDENCY_FAILED = 50 -NM_DEVICE_STATE_REASON_BR2684_FAILED = 51 -NM_DEVICE_STATE_REASON_MODEM_MANAGER_UNAVAILABLE = 52 -NM_DEVICE_STATE_REASON_SSID_NOT_FOUND = 53 -NM_DEVICE_STATE_REASON_SECONDARY_CONNECTION_FAILED = 54 -NM_DEVICE_STATE_REASON_DCB_FCOE_FAILED = 55 -NM_DEVICE_STATE_REASON_TEAMD_CONTROL_FAILED = 56 -NM_DEVICE_STATE_REASON_MODEM_FAILED = 57 -NM_DEVICE_STATE_REASON_MODEM_AVAILABLE = 58 -NM_DEVICE_STATE_REASON_SIM_PIN_INCORRECT = 59 -NM_DEVICE_STATE_REASON_NEW_ACTIVATION = 60 -NM_DEVICE_STATE_REASON_PARENT_CHANGED = 61 -NM_DEVICE_STATE_REASON_PARENT_MANAGED_CHANGED = 62 -NM_DEVICE_STATE_REASON_OVSDB_FAILED = 63 -NM_DEVICE_STATE_REASON_IP_ADDRESS_DUPLICATE = 64 -NM_DEVICE_STATE_REASON_IP_METHOD_UNSUPPORTED = 65 -NM_DEVICE_STATE_REASON_SRIOV_CONFIGURATION_FAILED = 66 -NM_DEVICE_STATE_REASON_PEER_NOT_FOUND = 67 -NM_METERED_UNKNOWN = 0 -NM_METERED_YES = 1 -NM_METERED_NO = 2 -NM_METERED_GUESS_YES = 3 -NM_METERED_GUESS_NO = 4 -NM_CONNECTION_MULTI_CONNECT_DEFAULT = 0 -NM_CONNECTION_MULTI_CONNECT_SINGLE = 1 -NM_CONNECTION_MULTI_CONNECT_MANUAL_MULTIPLE = 2 -NM_CONNECTION_MULTI_CONNECT_MULTIPLE = 3 -NM_ACTIVE_CONNECTION_STATE_UNKNOWN = 0 -NM_ACTIVE_CONNECTION_STATE_ACTIVATING = 1 -NM_ACTIVE_CONNECTION_STATE_ACTIVATED = 2 -NM_ACTIVE_CONNECTION_STATE_DEACTIVATING = 3 -NM_ACTIVE_CONNECTION_STATE_DEACTIVATED = 4 -NM_ACTIVE_CONNECTION_STATE_REASON_UNKNOWN = 0 -NM_ACTIVE_CONNECTION_STATE_REASON_NONE = 1 -NM_ACTIVE_CONNECTION_STATE_REASON_USER_DISCONNECTED = 2 -NM_ACTIVE_CONNECTION_STATE_REASON_DEVICE_DISCONNECTED = 3 -NM_ACTIVE_CONNECTION_STATE_REASON_SERVICE_STOPPED = 4 -NM_ACTIVE_CONNECTION_STATE_REASON_IP_CONFIG_INVALID = 5 -NM_ACTIVE_CONNECTION_STATE_REASON_CONNECT_TIMEOUT = 6 -NM_ACTIVE_CONNECTION_STATE_REASON_SERVICE_START_TIMEOUT = 7 -NM_ACTIVE_CONNECTION_STATE_REASON_SERVICE_START_FAILED = 8 -NM_ACTIVE_CONNECTION_STATE_REASON_NO_SECRETS = 9 -NM_ACTIVE_CONNECTION_STATE_REASON_LOGIN_FAILED = 10 -NM_ACTIVE_CONNECTION_STATE_REASON_CONNECTION_REMOVED = 11 -NM_ACTIVE_CONNECTION_STATE_REASON_DEPENDENCY_FAILED = 12 -NM_ACTIVE_CONNECTION_STATE_REASON_DEVICE_REALIZE_FAILED = 13 -NM_ACTIVE_CONNECTION_STATE_REASON_DEVICE_REMOVED = 14 -NM_SECRET_AGENT_GET_SECRETS_FLAG_NONE = 0 -NM_SECRET_AGENT_GET_SECRETS_FLAG_ALLOW_INTERACTION = 1 -NM_SECRET_AGENT_GET_SECRETS_FLAG_REQUEST_NEW = 2 -NM_SECRET_AGENT_GET_SECRETS_FLAG_USER_REQUESTED = 4 -NM_SECRET_AGENT_GET_SECRETS_FLAG_WPS_PBC_ACTIVE = 8 -NM_SECRET_AGENT_GET_SECRETS_FLAG_ONLY_SYSTEM = 2147483648 -NM_SECRET_AGENT_GET_SECRETS_FLAG_NO_ERRORS = 1073741824 -NM_IP_TUNNEL_MODE_UNKNOWN = 0 -NM_IP_TUNNEL_MODE_IPIP = 1 -NM_IP_TUNNEL_MODE_GRE = 2 -NM_IP_TUNNEL_MODE_SIT = 3 -NM_IP_TUNNEL_MODE_ISATAP = 4 -NM_IP_TUNNEL_MODE_VTI = 5 -NM_IP_TUNNEL_MODE_IP6IP6 = 6 -NM_IP_TUNNEL_MODE_IPIP6 = 7 -NM_IP_TUNNEL_MODE_IP6GRE = 8 -NM_IP_TUNNEL_MODE_VTI6 = 9 -NM_IP_TUNNEL_MODE_GRETAP = 10 -NM_IP_TUNNEL_MODE_IP6GRETAP = 11 -NM_CHECKPOINT_CREATE_FLAG_NONE = 0 -NM_CHECKPOINT_CREATE_FLAG_DESTROY_ALL = 1 -NM_CHECKPOINT_CREATE_FLAG_DELETE_NEW_CONNECTIONS = 2 -NM_CHECKPOINT_CREATE_FLAG_DISCONNECT_NEW_DEVICES = 4 -NM_CHECKPOINT_CREATE_FLAG_ALLOW_OVERLAPPING = 8 -NM_ROLLBACK_RESULT_OK = 0 -NM_ROLLBACK_RESULT_ERR_NO_DEVICE = 1 -NM_ROLLBACK_RESULT_ERR_DEVICE_UNMANAGED = 2 -NM_ROLLBACK_RESULT_ERR_FAILED = 3 -NM_SETTINGS_CONNECTION_FLAG_NONE = 0 -NM_SETTINGS_CONNECTION_FLAG_UNSAVED = 1 -NM_SETTINGS_CONNECTION_FLAG_NM_GENERATED = 2 -NM_SETTINGS_CONNECTION_FLAG_VOLATILE = 4 -NM_SETTINGS_CONNECTION_FLAG_EXTERNAL = 8 -NM_ACTIVATION_STATE_FLAG_NONE = 0 -NM_ACTIVATION_STATE_FLAG_IS_MASTER = 1 -NM_ACTIVATION_STATE_FLAG_IS_SLAVE = 2 -NM_ACTIVATION_STATE_FLAG_LAYER2_READY = 4 -NM_ACTIVATION_STATE_FLAG_IP4_READY = 8 -NM_ACTIVATION_STATE_FLAG_IP6_READY = 16 -NM_ACTIVATION_STATE_FLAG_MASTER_HAS_SLAVES = 32 -NM_ACTIVATION_STATE_FLAG_LIFETIME_BOUND_TO_PROFILE_VISIBILITY = 64 -NM_ACTIVATION_STATE_FLAG_EXTERNAL = 128 -NM_SETTINGS_ADD_CONNECTION2_FLAG_NONE = 0 -NM_SETTINGS_ADD_CONNECTION2_FLAG_TO_DISK = 1 -NM_SETTINGS_ADD_CONNECTION2_FLAG_IN_MEMORY = 2 -NM_SETTINGS_ADD_CONNECTION2_FLAG_BLOCK_AUTOCONNECT = 32 -NM_SETTINGS_UPDATE2_FLAG_NONE = 0 -NM_SETTINGS_UPDATE2_FLAG_TO_DISK = 1 -NM_SETTINGS_UPDATE2_FLAG_IN_MEMORY = 2 -NM_SETTINGS_UPDATE2_FLAG_IN_MEMORY_DETACHED = 4 -NM_SETTINGS_UPDATE2_FLAG_IN_MEMORY_ONLY = 8 -NM_SETTINGS_UPDATE2_FLAG_VOLATILE = 16 -NM_SETTINGS_UPDATE2_FLAG_BLOCK_AUTOCONNECT = 32 -NM_SETTINGS_UPDATE2_FLAG_NO_REAPPLY = 64 -NM_TERNARY_DEFAULT = -1 -NM_TERNARY_FALSE = 0 -NM_TERNARY_TRUE = 1 -NM_MANAGER_RELOAD_FLAG_NONE = 0 -NM_MANAGER_RELOAD_FLAG_CONF = 1 -NM_MANAGER_RELOAD_FLAG_DNS_RC = 2 -NM_MANAGER_RELOAD_FLAG_DNS_FULL = 4 -NM_MANAGER_RELOAD_FLAG_ALL = 7 -NM_DEVICE_INTERFACE_FLAG_NONE = 0 -NM_DEVICE_INTERFACE_FLAG_UP = 1 -NM_DEVICE_INTERFACE_FLAG_LOWER_UP = 2 -NM_DEVICE_INTERFACE_FLAG_CARRIER = 65536 -NM_CLIENT_PERMISSION_NONE = 0 -NM_CLIENT_PERMISSION_ENABLE_DISABLE_NETWORK = 1 -NM_CLIENT_PERMISSION_ENABLE_DISABLE_WIFI = 2 -NM_CLIENT_PERMISSION_ENABLE_DISABLE_WWAN = 3 -NM_CLIENT_PERMISSION_ENABLE_DISABLE_WIMAX = 4 -NM_CLIENT_PERMISSION_SLEEP_WAKE = 5 -NM_CLIENT_PERMISSION_NETWORK_CONTROL = 6 -NM_CLIENT_PERMISSION_WIFI_SHARE_PROTECTED = 7 -NM_CLIENT_PERMISSION_WIFI_SHARE_OPEN = 8 -NM_CLIENT_PERMISSION_SETTINGS_MODIFY_SYSTEM = 9 -NM_CLIENT_PERMISSION_SETTINGS_MODIFY_OWN = 10 -NM_CLIENT_PERMISSION_SETTINGS_MODIFY_HOSTNAME = 11 -NM_CLIENT_PERMISSION_SETTINGS_MODIFY_GLOBAL_DNS = 12 -NM_CLIENT_PERMISSION_RELOAD = 13 -NM_CLIENT_PERMISSION_CHECKPOINT_ROLLBACK = 14 -NM_CLIENT_PERMISSION_ENABLE_DISABLE_STATISTICS = 15 -NM_CLIENT_PERMISSION_ENABLE_DISABLE_CONNECTIVITY_CHECK = 16 -NM_CLIENT_PERMISSION_WIFI_SCAN = 17 -NM_CLIENT_PERMISSION_LAST = 17 -NM_CLIENT_PERMISSION_RESULT_UNKNOWN = 0 -NM_CLIENT_PERMISSION_RESULT_YES = 1 -NM_CLIENT_PERMISSION_RESULT_AUTH = 2 -NM_CLIENT_PERMISSION_RESULT_NO = 3 -NM_VPN_SERVICE_STATE_UNKNOWN = 0 -NM_VPN_SERVICE_STATE_INIT = 1 -NM_VPN_SERVICE_STATE_SHUTDOWN = 2 -NM_VPN_SERVICE_STATE_STARTING = 3 -NM_VPN_SERVICE_STATE_STARTED = 4 -NM_VPN_SERVICE_STATE_STOPPING = 5 -NM_VPN_SERVICE_STATE_STOPPED = 6 -NM_VPN_CONNECTION_STATE_UNKNOWN = 0 -NM_VPN_CONNECTION_STATE_PREPARE = 1 -NM_VPN_CONNECTION_STATE_NEED_AUTH = 2 -NM_VPN_CONNECTION_STATE_CONNECT = 3 -NM_VPN_CONNECTION_STATE_IP_CONFIG_GET = 4 -NM_VPN_CONNECTION_STATE_ACTIVATED = 5 -NM_VPN_CONNECTION_STATE_FAILED = 6 -NM_VPN_CONNECTION_STATE_DISCONNECTED = 7 -NM_VPN_CONNECTION_STATE_REASON_UNKNOWN = 0 -NM_VPN_CONNECTION_STATE_REASON_NONE = 1 -NM_VPN_CONNECTION_STATE_REASON_USER_DISCONNECTED = 2 -NM_VPN_CONNECTION_STATE_REASON_DEVICE_DISCONNECTED = 3 -NM_VPN_CONNECTION_STATE_REASON_SERVICE_STOPPED = 4 -NM_VPN_CONNECTION_STATE_REASON_IP_CONFIG_INVALID = 5 -NM_VPN_CONNECTION_STATE_REASON_CONNECT_TIMEOUT = 6 -NM_VPN_CONNECTION_STATE_REASON_SERVICE_START_TIMEOUT = 7 -NM_VPN_CONNECTION_STATE_REASON_SERVICE_START_FAILED = 8 -NM_VPN_CONNECTION_STATE_REASON_NO_SECRETS = 9 -NM_VPN_CONNECTION_STATE_REASON_LOGIN_FAILED = 10 -NM_VPN_CONNECTION_STATE_REASON_CONNECTION_REMOVED = 11 -NM_VPN_PLUGIN_FAILURE_LOGIN_FAILED = 0 -NM_VPN_PLUGIN_FAILURE_CONNECT_FAILED = 1 -NM_VPN_PLUGIN_FAILURE_BAD_IP_CONFIG = 2 -NM_SECRET_AGENT_ERROR_NOT_AUTHORIZED = 0 -NM_SECRET_AGENT_ERROR_INVALID_CONNECTION = 1 -NM_SECRET_AGENT_ERROR_USER_CANCELED = 2 -NM_SECRET_AGENT_ERROR_AGENT_CANCELED = 3 -NM_SECRET_AGENT_ERROR_INTERNAL_ERROR = 4 -NM_SECRET_AGENT_ERROR_NO_SECRETS = 5 diff --git a/ks_includes/screen_panel.py b/ks_includes/screen_panel.py index aaaf83e8..76462ba3 100644 --- a/ks_includes/screen_panel.py +++ b/ks_includes/screen_panel.py @@ -168,6 +168,15 @@ class ScreenPanel: if size < unit: return f"{(1024 * size / unit):.1f} {suffix}" + @staticmethod + def format_speed(bitrate): + bitrate = float(bitrate) + suffixes = ["Kbits/s", "Mbits/s", "Gbits/s", "Tbits/s", "Pbits/s", "Ebits/s", "Zbits/s", "Ybits/s"] + for i, suffix in enumerate(suffixes, start=1): + unit = 1000 ** i + if bitrate < unit: + return f"{(1000 * bitrate / unit):.0f} {suffix}" + @staticmethod def prettify(name: str): name = name.replace("_", " ") diff --git a/ks_includes/sdbus_nm.py b/ks_includes/sdbus_nm.py new file mode 100644 index 00000000..8b180a97 --- /dev/null +++ b/ks_includes/sdbus_nm.py @@ -0,0 +1,250 @@ +# This is the backend of the UI panel that communicates to sdbus-networkmanager +# TODO device selection/swtichability +# Alfredo Monclus (alfrix) 2024 + +import logging + +from sdbus_block.networkmanager import ( + NetworkManager, + NetworkDeviceGeneric, + NetworkDeviceWireless, + NetworkConnectionSettings, + NetworkManagerSettings, + AccessPoint, + NetworkManagerConnectionProperties, + IPv4Config, + ActiveConnection, + enums, +) +from sdbus import sd_bus_open_system, set_default_bus +from gi.repository import GLib +from uuid import uuid4 + + +NM_802_11_AP_SEC_NONE = 0 +NM_802_11_AP_SEC_PAIR_WEP40 = 1 +NM_802_11_AP_SEC_PAIR_WEP104 = 2 +NM_802_11_AP_SEC_PAIR_TKIP = 4 +NM_802_11_AP_SEC_PAIR_CCMP = 8 +NM_802_11_AP_SEC_GROUP_WEP40 = 16 +NM_802_11_AP_SEC_GROUP_WEP104 = 32 +NM_802_11_AP_SEC_GROUP_TKIP = 64 +NM_802_11_AP_SEC_GROUP_CCMP = 128 +NM_802_11_AP_SEC_KEY_MGMT_PSK = 256 +NM_802_11_AP_SEC_KEY_MGMT_802_1X = 512 + + +def get_encryption(flags): + encryption = "" + if (flags & NM_802_11_AP_SEC_PAIR_WEP40 or + flags & NM_802_11_AP_SEC_PAIR_WEP104 or + flags & NM_802_11_AP_SEC_GROUP_WEP40 or + flags & NM_802_11_AP_SEC_GROUP_WEP104): + encryption += "WEP " + if (flags & NM_802_11_AP_SEC_PAIR_TKIP or + flags & NM_802_11_AP_SEC_GROUP_TKIP): + encryption += "TKIP " + if (flags & NM_802_11_AP_SEC_PAIR_CCMP or + flags & NM_802_11_AP_SEC_GROUP_CCMP): + encryption += "AES " + if flags & NM_802_11_AP_SEC_KEY_MGMT_PSK: + encryption += "WPA-PSK " + if flags & NM_802_11_AP_SEC_KEY_MGMT_802_1X: + encryption += "802.1x " + return encryption + + +def WifiChannels(freq: str): + if freq == '2484': + return "2.4", "14" + try: + freq = float(freq) + except ValueError: + return "?", "?" + if 2412 <= freq <= 2472: + return "2.4", str(int((freq - 2407) / 5)) + elif 3657.5 <= freq <= 3692.5: + return "3", str(int((freq - 3000) / 5)) + elif 4915 <= freq <= 4980: + return "5", str(int((freq - 4000) / 5)) + elif 5035 <= freq <= 5885: + return "5", str(int((freq - 5000) / 5)) + elif 6455 <= freq <= 7115: + return "6", str(int((freq - 5950) / 5)) + else: + return "?", "?" + + +class SdbusNm: + + def __init__(self): + self.system_bus = sd_bus_open_system() # We need system bus + set_default_bus(self.system_bus) + self.nm = NetworkManager() + self._callbacks = { + "popup": [], + } + if self.get_wireless_interfaces(): + self.wlan_device = self.get_wireless_interfaces()[0] + self.wifi = True + else: + self.wlan_device = None + self.wifi = False + + def is_wifi_enabled(self): + return self.nm.wireless_enabled + + def get_interfaces(self): + return [NetworkDeviceGeneric(device).interface for device in self.nm.get_devices()] + + def get_wireless_interfaces(self): + devices = {path: NetworkDeviceGeneric(path) for path in self.nm.get_devices()} + return [ + NetworkDeviceWireless(path) + for path, device in devices.items() + if device.device_type == enums.DeviceType.WIFI + ] + + def get_primary_interface(self): + if self.nm.primary_connection == '/': + # Nothing connected + if self.wlan_device: + return self.wlan_device.interface + if len(self.get_interfaces()) > 1: + # skips the loopback device + return self.get_interfaces()[1] + return None + gateway = ActiveConnection(self.nm.primary_connection).devices[0] + return NetworkDeviceGeneric(gateway).interface + + @staticmethod + def get_known_networks(): + known_networks = [] + saved_network_paths = NetworkManagerSettings().list_connections() + for netpath in saved_network_paths: + saved_con = NetworkConnectionSettings(netpath) + con_settings = saved_con.get_settings() + # 'type': ('s', '802-11-wireless') + if con_settings['connection']['type'][1] == "802-11-wireless": + known_networks.append({ + 'SSID': con_settings['802-11-wireless']['ssid'][1].decode(), + 'UUID': con_settings['connection']['uuid'][1] + }) + return known_networks + + def is_known(self, ssid): + return any(net['SSID'] == ssid for net in self.get_known_networks()) + + def get_ip_address(self): + active_connection_path = self.nm.primary_connection + if not active_connection_path or active_connection_path == '/': + return "?" + active_connection = ActiveConnection(active_connection_path) + ip_info = IPv4Config(active_connection.ip4_config) + + return ip_info.address_data[0]['address'][1] + + def get_networks(self): + networks = [] + if self.wlan_device: + all_aps = [AccessPoint(result) for result in self.wlan_device.access_points] + networks.extend( + { + "SSID": ap.ssid.decode("utf-8"), + "known": self.is_known(ap.ssid.decode("utf-8")), + "security": get_encryption(ap.rsn_flags), + "frequency": WifiChannels(ap.frequency)[0], + "channel": WifiChannels(ap.frequency)[1], + "signal_level": ap.strength, + "max_bitrate": ap.max_bitrate, + "BSSID": ap.hw_address, + } + for ap in all_aps + if ap.ssid + ) + return sorted(networks, key=lambda i: i['signal_level'], reverse=True) + return networks + + def get_bssid_from_ssid(self, ssid): + return next(net['BSSID'] for net in self.get_networks() if ssid == net['SSID']) + + def get_connected_ap(self): + if self.wlan_device.active_access_point == "/": + return None + return AccessPoint(self.wlan_device.active_access_point) + + def get_connected_bssid(self): + return self.get_connected_ap().hw_address if self.get_connected_ap() is not None else None + + def add_network(self, ssid, psk): + existing_network = NetworkManagerSettings().get_connections_by_id(ssid) + if existing_network: + for network in existing_network: + self.delete_connection_path(network) + + properties: NetworkManagerConnectionProperties = { + "connection": { + "id": ("s", ssid), + "uuid": ("s", str(uuid4())), + "type": ("s", "802-11-wireless"), + "interface-name": ("s", self.wlan_device.interface) + }, + "802-11-wireless": { + "mode": ("s", "infrastructure"), + "security": ("s", "802-11-wireless-security"), + "ssid": ("ay", ssid.encode("utf-8")), + }, + "802-11-wireless-security": { + "key-mgmt": ("s", "wpa-psk"), + "auth-alg": ("s", "open"), + "psk": ("s", psk), + }, + "ipv4": {"method": ("s", "auto")}, + "ipv6": {"method": ("s", "auto")}, + } + + try: + msg = f"{ssid}\n" + _("Starting WiFi Association") + self.callback("popup", msg, 1) + NetworkManagerSettings().add_connection(properties) + return True + except Exception as e: + logging.error(f"{e}") + self.callback("popup", f"Error: {e}") + return False + + def disconnect_network(self): + self.wlan_device.disconnect() + + def delete_network(self, ssid): + connection = NetworkManagerSettings().get_connections_by_id(ssid) + for path in connection: + self.delete_connection_path(path) + + @staticmethod + def delete_connection_path(path): + NetworkConnectionSettings(path).delete() + + def rescan(self): + return self.wlan_device.request_scan({}) + + def connect(self, ssid): + connection = NetworkManagerSettings().get_connections_by_id(ssid) + if not connection: + self.callback("popup", f"{ssid} {connection}") + return + msg = f"{ssid}:\n" + _("Starting WiFi Association") + self.callback("popup", msg, 1) + self.nm.activate_connection(connection[0]) + + def add_callback(self, name, callback): + if name in self._callbacks and callback not in self._callbacks[name]: + self._callbacks[name].append(callback) + + def callback(self, cb_type, *args): + if cb_type in self._callbacks: + for cb in self._callbacks[cb_type]: + GLib.idle_add(cb, *args) + + def toggle_wifi(self, enable): + self.nm.wireless_enabled = enable diff --git a/ks_includes/wifi.py b/ks_includes/wifi.py deleted file mode 100644 index 32910d68..00000000 --- a/ks_includes/wifi.py +++ /dev/null @@ -1,347 +0,0 @@ -import os -import logging -import re -import socket -import threading -from threading import Thread -from queue import Queue - -import gi - -gi.require_version("Gtk", "3.0") -from gi.repository import GLib - - -class WifiManager: - networks_in_supplicant = [] - connected = False - _stop_loop = False - - def __init__(self, interface, *args, **kwargs): - super().__init__(*args, **kwargs) - self._callbacks = { - "connected": [], - "connecting_status": [], - "scan_results": [], - "popup": [], - } - self._stop_loop = False - self.connected = False - self.connected_ssid = None - self.event = threading.Event() - self.initialized = False - self.interface = interface - self.networks = {} - self.supplicant_networks = {} - self.queue = Queue() - self.timeout = None - - ks_socket_file = "/tmp/.KS_wpa_supplicant" - if os.path.exists(ks_socket_file): - os.remove(ks_socket_file) - - try: - self.soc = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) - self.soc.bind(ks_socket_file) - self.soc.connect(f"/var/run/wpa_supplicant/{interface}") - except Exception as e: - logging.critical(e, exc_info=True) - logging.error(f"Error connecting to wifi socket: {interface}") - return - - self.wpa_thread = WpaSocket(self, self.queue, self.callback) - self.wpa_thread.start() - self.initialized = True - - self.wpa_cli("ATTACH", False) - self.wpa_cli("SCAN", False) - GLib.idle_add(self.read_wpa_supplicant) - self.timeout = GLib.timeout_add_seconds(180, self.rescan) - - def add_callback(self, name, callback): - if name in self._callbacks and callback not in self._callbacks[name]: - self._callbacks[name].append(callback) - - def add_network(self, ssid, psk): - for netid in list(self.supplicant_networks): - if self.supplicant_networks[netid]['ssid'] == ssid: - # Modify network - return - - # TODO: Add wpa_cli error checking - network_id = self.wpa_cli("ADD_NETWORK") - commands = [ - f'ENABLE_NETWORK {network_id}', - 'SET_NETWORK %s ssid "%s"' % (network_id, ssid.replace('"', '\"')), - 'SET_NETWORK %s psk "%s"' % (network_id, psk.replace('"', '\"')) - ] - - self.wpa_cli_batch(commands) - - self.read_wpa_supplicant() - netid = None - for i in list(self.supplicant_networks): - if self.supplicant_networks[i]['ssid'] == ssid: - netid = i - break - - if netid is None: - logging.info("Error adding network") - return False - - self.save_wpa_conf() - return True - - def callback(self, cb_type, msg): - if cb_type in self._callbacks: - for cb in self._callbacks[cb_type]: - GLib.idle_add(cb, msg) - - def connect(self, ssid): - netid = None - for nid, net in self.supplicant_networks.items(): - if net['ssid'] == ssid: - netid = nid - break - - if netid is None: - logging.info("Wifi network is not defined in wpa_supplicant") - return False - - logging.info(f"Attempting to connect to wifi: {netid}") - self.callback("connecting_status", f"Attempting to connect to {ssid}") - self.wpa_cli(f"SELECT_NETWORK {netid}") - self.save_wpa_conf() - - def delete_network(self, ssid): - netid = None - for i in list(self.supplicant_networks): - if self.supplicant_networks[i]['ssid'] == ssid: - netid = i - break - - if netid is None: - logging.debug("Unable to find network in wpa_supplicant") - return - self.wpa_cli(f"REMOVE_NETWORK {netid}") - - for netid in list(self.supplicant_networks): - if self.supplicant_networks[netid]['ssid'] == ssid: - del self.supplicant_networks[netid] - break - - self.save_wpa_conf() - - def get_connected_ssid(self): - return self.connected_ssid - - def get_current_wifi(self): - con_ssid = os.popen("sudo iwgetid -r").read().strip() - con_bssid = os.popen("sudo iwgetid -r -a").read().strip() - # wpa_cli status output is unstable use it as backup only - status = self.wpa_cli("STATUS").split('\n') - variables = {} - for line in status: - arr = line.split('=') - variables[arr[0]] = "=".join(arr[1:]) - prev_ssid = self.connected_ssid - - if con_ssid != "": - self.connected = True - self.connected_ssid = con_ssid - for ssid, val in self.networks.items(): - self.networks[ssid]['connected'] = ssid == con_ssid - if prev_ssid != self.connected_ssid: - for cb in self._callbacks['connected']: - args = self.connected_ssid, prev_ssid - GLib.idle_add(cb, *args) - return [con_ssid, con_bssid] - elif "ssid" in variables and "bssid" in variables: - self.connected = True - self.connected_ssid = variables['ssid'] - for ssid, val in self.networks.items(): - self.networks[ssid]['connected'] = ssid == variables['ssid'] - if prev_ssid != self.connected_ssid: - for cb in self._callbacks['connected']: - args = self.connected_ssid, prev_ssid - GLib.idle_add(cb, *args) - return [variables['ssid'], variables['bssid']] - else: - logging.info("Resetting connected_ssid") - self.connected = False - self.connected_ssid = None - for ssid, val in self.networks.items(): - self.networks[ssid]['connected'] = False - if prev_ssid != self.connected_ssid: - for cb in self._callbacks['connected']: - args = self.connected_ssid, prev_ssid - GLib.idle_add(cb, *args) - return None - - def get_current_wifi_idle_add(self): - self.get_current_wifi() - return False - - def get_network_info(self, ssid=None, mac=None): - if ssid is not None and ssid in self.networks: - return self.networks[ssid] - if mac is not None and ssid is None: - for net in self.networks: - if mac == net['mac']: - return net - return {} - - def get_networks(self): - return list(self.networks) - - def get_supplicant_networks(self): - return self.supplicant_networks - - def read_wpa_supplicant(self): - results = self.wpa_cli("LIST_NETWORKS").split('\n') - results.pop(0) - self.supplicant_networks = {} - self.networks_in_supplicant = [] - for net in [n.split('\t') for n in results]: - self.supplicant_networks[net[0]] = { - "ssid": net[1], - "bssid": net[2], - "flags": net[3] if len(net) == 4 else "" - } - self.networks_in_supplicant.append(self.supplicant_networks[net[0]]) - - def rescan(self): - self.wpa_cli("SCAN", False) - - def save_wpa_conf(self): - logging.info("Saving WPA config") - self.wpa_cli("SAVE_CONFIG") - - def scan_results(self): - new_networks = [] - deleted_networks = list(self.networks) - - results = self.wpa_cli("SCAN_RESULTS").split('\n') - results.pop(0) - - aps = [] - for res in results: - match = re.match("^([a-f0-9:]+)\\s+([0-9]+)\\s+([\\-0-9]+)\\s+(\\S+)\\s+(.+)?", res) - if match: - net = { - "mac": match[1], - "channel": WifiChannels.lookup(match[2])[1], - "connected": False, - "configured": False, - "frequency": match[2], - "flags": match[4], - "signal_level_dBm": match[3], - "ssid": match[5] - } - - if "WPA2" in net['flags']: - net['encryption'] = "WPA2" - elif "WPA" in net['flags']: - net['encryption'] = "WPA" - elif "WEP" in net['flags']: - net['encryption'] = "WEP" - else: - net['encryption'] = "off" - - aps.append(net) - - cur_info = self.get_current_wifi() - self.networks = {} - for ap in aps: - self.networks[ap['ssid']] = ap - if cur_info is not None and cur_info[0] == ap['ssid'] and cur_info[1].lower() == ap['mac'].lower(): - self.networks[ap['ssid']]['connected'] = True - - for net in list(self.networks): - if net in deleted_networks: - deleted_networks.remove(net) - else: - new_networks.append(net) - if new_networks or deleted_networks: - for cb in self._callbacks['scan_results']: - args = new_networks, deleted_networks - GLib.idle_add(cb, *args) - - def wpa_cli(self, command, wait=True): - if wait is False: - self.wpa_thread.skip_command() - self.soc.send(command.encode()) - if wait is True: - return self.queue.get() - - def wpa_cli_batch(self, commands): - for cmd in commands: - self.wpa_cli(cmd) - - -class WpaSocket(Thread): - def __init__(self, wm, queue, callback): - super().__init__() - self.queue = queue - self.callback = callback - self.soc = wm.soc - self._stop_loop = False - self.skip_commands = 0 - self.wm = wm - - def run(self): - logging.debug("Setting up wifi event loop") - while self._stop_loop is False: - try: - msg = self.soc.recv(4096).decode().strip() - except Exception as e: - logging.critical(e, exc_info=True) - # TODO: Socket error - continue - if msg.startswith("<"): - if "CTRL-EVENT-SCAN-RESULTS" in msg: - GLib.idle_add(self.wm.scan_results) - elif "CTRL-EVENT-DISCONNECTED" in msg: - self.callback("connecting_status", msg) - match = re.match('<3>CTRL-EVENT-DISCONNECTED bssid=(\\S+) reason=3 locally_generated=1', msg) - if match: - for net in self.wm.networks: - if self.wm.networks[net]['mac'] == match[1]: - self.wm.networks[net]['connected'] = False - break - elif "Trying to associate" in msg or "CTRL-EVENT-REGDOM-CHANGE" in msg: - self.callback("connecting_status", msg) - elif "CTRL-EVENT-CONNECTED" in msg: - GLib.idle_add(self.wm.get_current_wifi_idle_add) - self.callback("connecting_status", msg) - elif self.skip_commands > 0: - self.skip_commands = self.skip_commands - 1 - else: - self.queue.put(msg) - logging.info("Wifi event loop ended") - - def skip_command(self): - self.skip_commands = self.skip_commands + 1 - - -class WifiChannels: - @staticmethod - def lookup(freq: str): - if freq == '2484': - return "2.4", "14" - try: - freq = float(freq) - except ValueError: - return None - if 2412 <= freq <= 2472: - return "2.4", str(int((freq - 2407) / 5)) - elif 3657.5 <= freq <= 3692.5: - return "3", str(int((freq - 3000) / 5)) - elif 4915 <= freq <= 4980: - return "5", str(int((freq - 4000) / 5)) - elif 5035 <= freq <= 5885: - return "5", str(int((freq - 5000) / 5)) - elif 6455 <= freq <= 7115: - return "6", str(int((freq - 5950) / 5)) - else: - return "?", "?" diff --git a/ks_includes/wifi_nm.py b/ks_includes/wifi_nm.py deleted file mode 100644 index 4a2fc0f4..00000000 --- a/ks_includes/wifi_nm.py +++ /dev/null @@ -1,272 +0,0 @@ -# Network in KlipperScreen is a connection in NetworkManager -# Interface in KlipperScreen is a device in NetworkManager - -import logging -import uuid -import dbus -import gi - -gi.require_version('Gdk', '3.0') -from gi.repository import GLib -from contextlib import suppress -from ks_includes.wifi import WifiChannels -from ks_includes import NetworkManager -from dbus.mainloop.glib import DBusGMainLoop - - -class WifiManager: - networks_in_supplicant = [] - - def __init__(self, interface_name, *args, **kwargs): - super().__init__(*args, **kwargs) - DBusGMainLoop(set_as_default=True) - self._callbacks = { - "connected": [], - "connecting_status": [], - "scan_results": [], - "popup": [], - } - self.connected = False - self.connected_ssid = None - self.interface_name = interface_name - self.known_networks = {} # List of known connections - self.visible_networks = {} # List of visible access points - self.ssid_by_path = {} - self.path_by_ssid = {} - self.hidden_ssid_index = 0 - - self.wifi_dev = NetworkManager.NetworkManager.GetDeviceByIpIface(interface_name) - self.wifi_dev.OnAccessPointAdded(self._ap_added) - self.wifi_dev.OnAccessPointRemoved(self._ap_removed) - self.wifi_dev.OnStateChanged(self._ap_state_changed) - - for ap in self.wifi_dev.GetAccessPoints(): - self._add_ap(ap) - self._update_known_connections() - self.initialized = True - - def _update_known_connections(self): - self.known_networks = {} - for con in NetworkManager.Settings.ListConnections(): - settings = con.GetSettings() - if "802-11-wireless" in settings: - ssid = settings["802-11-wireless"]['ssid'] - self.known_networks[ssid] = con - - def _ap_added(self, nm, interface, signal, access_point): - with suppress(NetworkManager.ObjectVanished): - access_point.OnPropertiesChanged(self._ap_prop_changed) - ssid = self._add_ap(access_point) - for cb in self._callbacks['scan_results']: - args = (cb, [ssid], []) - GLib.idle_add(*args) - - def _ap_removed(self, dev, interface, signal, access_point): - path = access_point.object_path - if path in self.ssid_by_path: - ssid = self.ssid_by_path[path] - self._remove_ap(path) - for cb in self._callbacks['scan_results']: - args = (cb, [ssid], []) - GLib.idle_add(*args) - - def _ap_state_changed(self, nm, interface, signal, old_state, new_state, reason): - msg = "" - if new_state in (NetworkManager.NM_DEVICE_STATE_UNKNOWN, NetworkManager.NM_DEVICE_STATE_REASON_UNKNOWN): - msg = "State is unknown" - elif new_state == NetworkManager.NM_DEVICE_STATE_UNMANAGED: - msg = "Error: Not managed by NetworkManager" - elif new_state == NetworkManager.NM_DEVICE_STATE_UNAVAILABLE: - msg = "Error: Not available for use:\nReasons may include the wireless switched off, missing firmware, etc." - elif new_state == NetworkManager.NM_DEVICE_STATE_DISCONNECTED: - msg = "Currently disconnected" - elif new_state == NetworkManager.NM_DEVICE_STATE_PREPARE: - msg = "Preparing the connection to the network" - elif new_state == NetworkManager.NM_DEVICE_STATE_CONFIG: - msg = "Connecting to the requested network..." - elif new_state == NetworkManager.NM_DEVICE_STATE_NEED_AUTH: - msg = "Authorizing" - elif new_state == NetworkManager.NM_DEVICE_STATE_IP_CONFIG: - msg = "Requesting IP addresses and routing information" - elif new_state == NetworkManager.NM_DEVICE_STATE_IP_CHECK: - msg = "Checking whether further action is required for the requested network connection" - elif new_state == NetworkManager.NM_DEVICE_STATE_SECONDARIES: - msg = "Waiting for a secondary connection (like a VPN)" - elif new_state == NetworkManager.NM_DEVICE_STATE_ACTIVATED: - msg = "Connected" - elif new_state == NetworkManager.NM_DEVICE_STATE_DEACTIVATING: - msg = "A disconnection from the current network connection was requested" - elif new_state == NetworkManager.NM_DEVICE_STATE_FAILED: - msg = "Failed to connect to the requested network" - self.callback("popup", msg) - elif new_state == NetworkManager.NM_DEVICE_STATE_REASON_DEPENDENCY_FAILED: - msg = "A dependency of the connection failed" - elif new_state == NetworkManager.NM_DEVICE_STATE_REASON_CARRIER: - msg = "" - else: - logging.info(f"State {new_state}") - if msg != "": - self.callback("connecting_status", msg) - - if new_state == NetworkManager.NM_DEVICE_STATE_ACTIVATED: - self.connected = True - for cb in self._callbacks['connected']: - args = (cb, self.get_connected_ssid(), None) - GLib.idle_add(*args) - else: - self.connected = False - - def _ap_prop_changed(self, ap, interface, signal, properties): - pass - - def _add_ap(self, ap): - ssid = ap.Ssid - if ssid == "": - ssid = _("Hidden") + f" {self.hidden_ssid_index}" - self.hidden_ssid_index += 1 - self.ssid_by_path[ap.object_path] = ssid - self.path_by_ssid[ssid] = ap.object_path - self.visible_networks[ap.object_path] = ap - return ssid - - def _remove_ap(self, path): - self.ssid_by_path.pop(path, None) - self.visible_networks.pop(path, None) - - def add_callback(self, name, callback): - if name in self._callbacks and callback not in self._callbacks[name]: - self._callbacks[name].append(callback) - - def callback(self, cb_type, msg): - if cb_type in self._callbacks: - for cb in self._callbacks[cb_type]: - GLib.idle_add(cb, msg) - - def add_network(self, ssid, psk): - aps = self._visible_networks_by_ssid() - if ssid in aps: - ap = aps[ssid] - new_connection = { - '802-11-wireless': { - 'mode': 'infrastructure', - 'security': '802-11-wireless-security', - 'ssid': ssid - }, - '802-11-wireless-security': { - 'auth-alg': 'open', - 'key-mgmt': 'wpa-psk', - 'psk': psk - }, - 'connection': { - 'id': ssid, - 'type': '802-11-wireless', - 'uuid': str(uuid.uuid4()) - }, - 'ipv4': { - 'method': 'auto' - }, - 'ipv6': { - 'method': 'auto' - } - } - try: - NetworkManager.Settings.AddConnection(new_connection) - except dbus.exceptions.DBusException as e: - msg = _("Invalid password") if "802-11-wireless-security.psk" in e else f"{e}" - self.callback("popup", msg) - logging.info(f"Error adding network {e}") - self._update_known_connections() - return True - - def connect(self, ssid): - if ssid in self.known_networks: - conn = self.known_networks[ssid] - with suppress(NetworkManager.ObjectVanished): - msg = f"Connecting to: {ssid}" - logging.info(msg) - self.callback("connecting_status", msg) - NetworkManager.NetworkManager.ActivateConnection(conn, self.wifi_dev, "/") - - def delete_network(self, ssid): - for ssid in self.known_networks: - con = self.known_networks[ssid] - if con.GetSettings()['connection']['id'] == ssid: - con.Delete() - self._update_known_connections() - - def get_connected_ssid(self): - if self.wifi_dev.SpecificDevice().ActiveAccessPoint: - return self.wifi_dev.SpecificDevice().ActiveAccessPoint.Ssid - return None - - def _get_connected_ap(self): - return self.wifi_dev.SpecificDevice().ActiveAccessPoint - - def _visible_networks_by_ssid(self): - aps = self.wifi_dev.GetAccessPoints() - ret = {} - for ap in aps: - with suppress(NetworkManager.ObjectVanished): - ret[ap.Ssid] = ap - return ret - - def get_network_info(self, ssid): - netinfo = {} - if ssid in self.known_networks: - con = self.known_networks[ssid] - with suppress(NetworkManager.ObjectVanished): - settings = con.GetSettings() - if settings and '802-11-wireless' in settings: - netinfo.update({ - "ssid": settings['802-11-wireless']['ssid'], - "connected": self.get_connected_ssid() == ssid - }) - path = self.path_by_ssid[ssid] - aps = self.visible_networks - if path in aps: - ap = aps[path] - with suppress(NetworkManager.ObjectVanished): - netinfo.update({ - "mac": ap.HwAddress, - "channel": WifiChannels.lookup(ap.Frequency)[1], - "configured": ssid in self.known_networks, - "frequency": str(ap.Frequency), - "flags": ap.Flags, - "ssid": ssid, - "connected": self._get_connected_ap() == ap, - "encryption": self._get_encryption(ap.RsnFlags), - "signal_level_dBm": str(ap.Strength) - }) - return netinfo - - @staticmethod - def _get_encryption(flags): - encryption = "" - if (flags & NetworkManager.NM_802_11_AP_SEC_PAIR_WEP40 or - flags & NetworkManager.NM_802_11_AP_SEC_PAIR_WEP104 or - flags & NetworkManager.NM_802_11_AP_SEC_GROUP_WEP40 or - flags & NetworkManager.NM_802_11_AP_SEC_GROUP_WEP104): - encryption += "WEP " - if (flags & NetworkManager.NM_802_11_AP_SEC_PAIR_TKIP or - flags & NetworkManager.NM_802_11_AP_SEC_GROUP_TKIP): - encryption += "TKIP " - if (flags & NetworkManager.NM_802_11_AP_SEC_PAIR_CCMP or - flags & NetworkManager.NM_802_11_AP_SEC_GROUP_CCMP): - encryption += "AES " - if flags & NetworkManager.NM_802_11_AP_SEC_KEY_MGMT_PSK: - encryption += "WPA-PSK " - if flags & NetworkManager.NM_802_11_AP_SEC_KEY_MGMT_802_1X: - encryption += "802.1x " - return encryption.strip() - - def get_networks(self): - return list(set(list(self.known_networks.keys()) + list(self.ssid_by_path.values()))) - - def get_supplicant_networks(self): - return {ssid: {"ssid": ssid} for ssid in self.known_networks.keys()} - - def rescan(self): - try: - self.wifi_dev.RequestScan({}) - except dbus.exceptions.DBusException as e: - logging.error(f"Error during rescan {e}") diff --git a/panels/network.py b/panels/network.py index 31d47f59..e56d4409 100644 --- a/panels/network.py +++ b/panels/network.py @@ -1,143 +1,99 @@ import logging import os import gi -import netifaces gi.require_version("Gtk", "3.0") from gi.repository import Gtk, GLib, Pango from ks_includes.screen_panel import ScreenPanel +from ks_includes.sdbus_nm import SdbusNm class Panel(ScreenPanel): - initialized = False def __init__(self, screen, title): super().__init__(screen, title) self.show_add = False - self.networks = {} - self.interface = None - self.prev_network = None self.update_timeout = None - self.network_interfaces = netifaces.interfaces() - self.wireless_interfaces = [iface for iface in self.network_interfaces if iface.startswith('wl')] - self.wifi = None - self.use_network_manager = os.system('systemctl is-active --quiet NetworkManager.service') == 0 - if self.wireless_interfaces: - logging.info(f"Found wireless interfaces: {self.wireless_interfaces}") - if self.use_network_manager: - logging.info("Using NetworkManager") - from ks_includes.wifi_nm import WifiManager - else: - logging.info("Using wpa_cli") - from ks_includes.wifi import WifiManager - self.wifi = WifiManager(self.wireless_interfaces[0]) - else: - logging.info(_("No wireless interface has been found")) + self.network_list = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, hexpand=True, vexpand=True) + self.network_rows = {} + self.networks = {} + self.sdbus_nm = SdbusNm() + self.wifi_signal_icons = { + 'excellent': self._gtk.PixbufFromIcon('wifi_excellent'), + 'good': self._gtk.PixbufFromIcon('wifi_good'), + 'fair': self._gtk.PixbufFromIcon('wifi_fair'), + 'weak': self._gtk.PixbufFromIcon('wifi_weak'), + } - # Get IP Address - gws = netifaces.gateways() - if "default" in gws and netifaces.AF_INET in gws["default"]: - self.interface = gws["default"][netifaces.AF_INET][1] - else: - ints = netifaces.interfaces() - if 'lo' in ints: - ints.pop(ints.index('lo')) - self.interface = ints[0] if len(ints) > 0 else 'lo' + self.network_interfaces = self.sdbus_nm.get_interfaces() + logging.info(f"Network interfaces: {self.network_interfaces}") - self.labels['networks'] = {} + self.wireless_interfaces = [iface.interface for iface in self.sdbus_nm.get_wireless_interfaces()] + logging.info(f"Wireless interfaces: {self.wireless_interfaces}") + + self.interface = self.sdbus_nm.get_primary_interface() + logging.info(f"Primary interface: {self.interface}") self.labels['interface'] = Gtk.Label(hexpand=True) - self.labels['interface'].set_text(_("Interface") + f': {self.interface} ') - self.labels['ip'] = Gtk.Label(hexpand=True) - ifadd = netifaces.ifaddresses(self.interface) - if ifadd.get(netifaces.AF_INET): - self.labels['ip'].set_text(f"IP: {ifadd[netifaces.AF_INET][0]['addr']} ") + if self.interface is not None: + self.labels['interface'].set_text(_("Interface") + f': {self.interface}') + self.labels['ip'].set_text(f"IP: {self.sdbus_nm.get_ip_address()}") - reload_networks = self._gtk.Button("refresh", None, "color1", self.bts) - reload_networks.connect("clicked", self.reload_networks) - reload_networks.set_hexpand(False) + self.reload_button = self._gtk.Button("refresh", None, "color1", self.bts) + self.reload_button.set_no_show_all(True) + self.reload_button.show() + self.reload_button.connect("clicked", self.reload_networks) + self.reload_button.set_hexpand(False) + + self.wifi_toggle = Gtk.Switch( + width_request=round(self._gtk.font_size * 2), + height_request=round(self._gtk.font_size), + active=self.sdbus_nm.is_wifi_enabled() + ) + self.wifi_toggle.connect("notify::active", self.toggle_wifi) sbox = Gtk.Box(hexpand=True, vexpand=False) sbox.add(self.labels['interface']) sbox.add(self.labels['ip']) - sbox.add(reload_networks) + sbox.add(self.reload_button) + sbox.add(self.wifi_toggle) scroll = self._gtk.ScrolledWindow() + self.labels['main_box'] = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, vexpand=True) - box = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, vexpand=True) - - self.labels['networklist'] = Gtk.Grid() - - if self.wifi is not None and self.wifi.initialized: - box.pack_start(sbox, False, False, 5) - box.pack_start(scroll, True, True, 0) - + if self.sdbus_nm.wifi: + self.labels['main_box'].pack_start(sbox, False, False, 5) GLib.idle_add(self.load_networks) - scroll.add(self.labels['networklist']) + scroll.add(self.network_list) - self.wifi.add_callback("connected", self.connected_callback) - self.wifi.add_callback("scan_results", self.scan_callback) - self.wifi.add_callback("popup", self.popup_callback) - if self.update_timeout is None: - self.update_timeout = GLib.timeout_add_seconds(5, self.update_all_networks) + self.sdbus_nm.add_callback("popup", self.popup_callback) else: + self._screen.show_popup_message(_("No wireless interface has been found"), level=2) self.labels['networkinfo'] = Gtk.Label() - self.labels['networkinfo'].get_style_context().add_class('temperature_entry') - box.pack_start(self.labels['networkinfo'], False, False, 0) + scroll.add(self.labels['networkinfo']) self.update_single_network_info() - if self.update_timeout is None: - self.update_timeout = GLib.timeout_add_seconds(5, self.update_single_network_info) - self.content.add(box) - self.labels['main_box'] = box - self.initialized = True + self.labels['main_box'].pack_start(scroll, True, True, 0) + self.content.add(self.labels['main_box']) - def load_networks(self, widget=None): - networks = self.wifi.get_networks() - if not networks: - return - for net in networks: - self.add_network(net, False) - self.update_all_networks() - if widget: - GLib.timeout_add_seconds(10, self._gtk.Button_busy, widget, False) + def popup_callback(self, msg, level=3): + self._screen.show_popup_message(msg, level) + + def load_networks(self): + for net in self.sdbus_nm.get_networks(): + self.add_network(net['BSSID']) + GLib.timeout_add_seconds(10, self._gtk.Button_busy, self.reload_button, False) self.content.show_all() return False - def add_network(self, ssid, show=True): - - if ssid is None: - return - ssid = ssid.strip() - if ssid in list(self.networks): + def add_network(self, bssid): + if bssid in self.network_rows: + logging.info(f"{bssid} already in list") return - configured_networks = self.wifi.get_supplicant_networks() - network_id = -1 - for net in list(configured_networks): - if configured_networks[net]['ssid'] == ssid: - network_id = net - - display_name = _("Hidden") if ssid.startswith("\x00") else f"{ssid}" - netinfo = self.wifi.get_network_info(ssid) - connected_ssid = self.wifi.get_connected_ssid() - if netinfo is None: - logging.debug("Couldn't get netinfo") - netinfo = {'connected': connected_ssid == ssid} - - name = Gtk.Label(hexpand=True, halign=Gtk.Align.START, wrap=True, wrap_mode=Pango.WrapMode.WORD_CHAR) - if connected_ssid == ssid: - display_name += " (" + _("Connected") + ")" - name.set_markup(f"{display_name}") - else: - name.set_label(display_name) - - info = Gtk.Label(halign=Gtk.Align.START) - labels = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, vexpand=True, - halign=Gtk.Align.START, valign=Gtk.Align.CENTER) - labels.add(name) - labels.add(info) + net = next(net for net in self.sdbus_nm.get_networks() if bssid == net['BSSID']) + ssid = net['SSID'] connect = self._gtk.Button("load", None, "color3", self.bts) connect.connect("clicked", self.connect_network, ssid) @@ -145,54 +101,79 @@ class Panel(ScreenPanel): connect.set_halign(Gtk.Align.END) delete = self._gtk.Button("delete", None, "color3", self.bts) - delete.connect("clicked", self.remove_wifi_network, ssid) + delete.connect("clicked", self.remove_confirm_dialog, ssid, bssid) delete.set_hexpand(False) delete.set_halign(Gtk.Align.END) - network = Gtk.Box(spacing=5, hexpand=True, vexpand=False) - network.get_style_context().add_class("frame-item") - network.add(labels) - buttons = Gtk.Box(spacing=5) - if network_id != -1 or netinfo['connected']: - buttons.pack_end(connect, False, False, 0) - buttons.pack_end(delete, False, False, 0) - else: - buttons.pack_end(connect, False, False, 0) - network.add(buttons) - self.networks[ssid] = network - nets = sorted(list(self.networks), reverse=False) - if connected_ssid in nets: - nets.remove(connected_ssid) - nets.insert(0, connected_ssid) - if nets.index(ssid) is not None: - pos = nets.index(ssid) + name = Gtk.Label(hexpand=True, halign=Gtk.Align.START, wrap=True, wrap_mode=Pango.WrapMode.WORD_CHAR) + if bssid == self.sdbus_nm.get_connected_bssid(): + ssid += ' (' + _("Connected") + ')' + name.set_markup(f"{ssid}") else: - logging.info("Error: SSID not in nets") - return + name.set_markup(f"{ssid}") + if net['known']: + buttons.add(delete) + buttons.add(connect) - self.labels['networks'][ssid] = { + info = Gtk.Label(halign=Gtk.Align.START) + labels = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, vexpand=True, + halign=Gtk.Align.START, valign=Gtk.Align.CENTER) + labels.add(name) + labels.add(info) + icon = self._gtk.Image('wifi_weak') + + self.network_rows[bssid] = Gtk.Box(spacing=5, hexpand=True, vexpand=False) + self.network_rows[bssid].get_style_context().add_class("frame-item") + self.network_rows[bssid].add(icon) + self.network_rows[bssid].add(labels) + self.network_rows[bssid].add(buttons) + + self.networks[bssid] = { "connect": connect, "delete": delete, + "icon": icon, "info": info, "name": name, - "row": network + "row": self.network_rows[bssid], } - self.labels['networklist'].insert_row(pos) - self.labels['networklist'].attach(self.networks[ssid], 0, pos, 1, 1) - if show: - self.labels['networklist'].show() + self.network_list.add(self.network_rows[bssid]) + + def remove_confirm_dialog(self, widget, ssid, bssid): + + label = Gtk.Label(wrap=True, vexpand=True) + label.set_markup(_("Do you want to forget or disconnect %s?") % ssid) + buttons = [ + {"name": _("Forget"), "response": Gtk.ResponseType.OK, "style": 'dialog-warning'}, + {"name": _("Cancel"), "response": Gtk.ResponseType.CANCEL, "style": 'dialog-error'}, + ] + if bssid == self.sdbus_nm.get_connected_bssid(): + buttons.insert(0, {"name": _("Disconnect"), "response": Gtk.ResponseType.APPLY, "style": 'dialog-info'}) + self._gtk.Dialog(_("Remove network"), buttons, label, self.confirm_removal, ssid) + + def confirm_removal(self, dialog, response_id, ssid): + self._gtk.remove_dialog(dialog) + if response_id == Gtk.ResponseType.CANCEL: + return + bssid = self.sdbus_nm.get_bssid_from_ssid(ssid) + self.remove_network_from_list(bssid) + if response_id == Gtk.ResponseType.OK: + logging.info(f"Deleting {ssid}") + self.sdbus_nm.delete_network(ssid) + if response_id == Gtk.ResponseType.APPLY: + logging.info(f"Disconnecting {ssid}") + self.sdbus_nm.disconnect_network() def add_new_network(self, widget, ssid): self._screen.remove_keyboard() - result = self.wifi.add_network(ssid, self.labels['network_psk'].get_text()) + result = self.sdbus_nm.add_network(ssid, self.labels['network_psk'].get_text()) self.close_add_network() if result: - self.connect_network(widget, ssid, False) + self.connect_network(widget, ssid, showadd=False) else: - self._screen.show_popup_message(f"Error adding network {ssid}") + self._screen.show_popup_message(_("Invalid password")) def back(self): if self.show_add: @@ -200,16 +181,6 @@ class Panel(ScreenPanel): return True return False - def check_missing_networks(self): - networks = self.wifi.get_networks() - for net in list(self.networks): - if net in networks: - networks.remove(net) - - for net in networks: - self.add_network(net, False) - self.labels['networklist'].show_all() - def close_add_network(self): if not self.show_add: return @@ -223,71 +194,27 @@ class Panel(ScreenPanel): del self.labels[i] self.show_add = False - def popup_callback(self, msg): - self._screen.show_popup_message(msg) - - def connected_callback(self, ssid, prev_ssid): - logging.info("Now connected to a new network") - if ssid is not None: - self.remove_network(ssid) - if prev_ssid is not None: - self.remove_network(prev_ssid) - - self.check_missing_networks() - def connect_network(self, widget, ssid, showadd=True): - isdef = any(net['ssid'] == ssid for netid, net in self.wifi.get_supplicant_networks().items()) - if not isdef: - if showadd: - self.show_add_network(widget, ssid) + self.deactivate() + if showadd and not self.sdbus_nm.is_known(ssid): + self.show_add_network(widget, ssid) + self.activate() return - self.prev_network = self.wifi.get_connected_ssid() + bssid = self.sdbus_nm.get_bssid_from_ssid(ssid) + if bssid and bssid in self.network_rows: + self.remove_network_from_list(bssid) + self.sdbus_nm.connect(ssid) + self.update_all_networks() + self.activate() - buttons = [ - {"name": _("Close"), "response": Gtk.ResponseType.CANCEL} - ] - - scroll = self._gtk.ScrolledWindow() - self.labels['connecting_info'] = Gtk.Label( - label=_("Starting WiFi Association"), halign=Gtk.Align.START, valign=Gtk.Align.START, wrap=True) - scroll.add(self.labels['connecting_info']) - self._gtk.Dialog(_("Starting WiFi Association"), buttons, scroll, self._gtk.remove_dialog) - self._screen.show_all() - - if ssid in list(self.networks): - self.remove_network(ssid) - if self.prev_network in list(self.networks): - self.remove_network(self.prev_network) - - self.wifi.add_callback("connecting_status", self.connecting_status_callback) - self.wifi.connect(ssid) - - def connecting_status_callback(self, msg): - self.labels['connecting_info'].set_text(f"{self.labels['connecting_info'].get_text()}\n{msg}") - self.labels['connecting_info'].show_all() - - def remove_network(self, ssid, show=True): - if ssid not in list(self.networks): + def remove_network_from_list(self, bssid): + if bssid not in self.network_rows: + logging.error(f"{bssid} not in rows") return - for i in range(len(self.labels['networklist'])): - if self.networks[ssid] == self.labels['networklist'].get_child_at(0, i): - self.labels['networklist'].remove_row(i) - self.labels['networklist'].show() - del self.networks[ssid] - del self.labels['networks'][ssid] - return - - def remove_wifi_network(self, widget, ssid): - self.wifi.delete_network(ssid) - self.remove_network(ssid) - self.check_missing_networks() - - def scan_callback(self, new_networks, old_networks): - for net in old_networks: - self.remove_network(net, False) - for net in new_networks: - self.add_network(net, False) - self.content.show_all() + self.network_list.remove(self.network_rows[bssid]) + del self.network_rows[bssid] + del self.networks[bssid] + return def show_add_network(self, widget, ssid): if self.show_add: @@ -323,100 +250,96 @@ class Panel(ScreenPanel): self.show_add = True def update_all_networks(self): - for network in list(self.networks): - self.update_network_info(network) + self.interface = self.sdbus_nm.get_primary_interface() + self.labels['interface'].set_text(_("Interface") + f': {self.interface}') + self.labels['ip'].set_text(f"IP: {self.sdbus_nm.get_ip_address()}") + nets = self.sdbus_nm.get_networks() + remove = [bssid for bssid in self.network_rows.keys() if bssid not in [net['BSSID'] for net in nets]] + for bssid in remove: + self.remove_network_from_list(bssid) + for net in nets: + if net['BSSID'] not in self.network_rows.keys(): + self.add_network(net['BSSID']) + self.update_network_info(net) + for i, net in enumerate(nets): + for child in self.network_list.get_children(): + if child == self.network_rows[net['BSSID']]: + self.network_list.reorder_child(child, i) + self.network_list.show_all() return True - def update_network_info(self, ssid): - info = freq = encr = chan = lvl = ipv4 = ipv6 = "" - - if ssid not in list(self.networks) or ssid not in self.labels['networks']: - logging.info(f"Unknown SSID {ssid}") + def update_network_info(self, net): + if net['BSSID'] not in self.network_rows.keys() or net['BSSID'] not in self.networks: + logging.info(f"Unknown SSID {net['SSID']}") return - netinfo = self.wifi.get_network_info(ssid) - if netinfo.get('connected') or self.wifi.get_connected_ssid() == ssid: - ifadd = netifaces.ifaddresses(self.interface) - if ifadd.get(netifaces.AF_INET): - ipv4 = f"IPv4: {ifadd[netifaces.AF_INET][0]['addr']}" - self.labels['ip'].set_text(f"IP: {ifadd[netifaces.AF_INET][0]['addr']} ") - if ifadd.get(netifaces.AF_INET6): - ipv6 = f"IPv6: {ifadd[netifaces.AF_INET6][0]['addr'].split('%')[0]}" + info = _("Password saved") + '\n' if net['known'] else "" + chan = _("Channel") + f' {net["channel"]}' + max_bitrate = _("Max:") + f"{self.format_speed(net['max_bitrate'])}" + self.networks[net['BSSID']]['icon'].set_from_pixbuf(self.get_signal_strength_icon(net["signal_level"])) + self.networks[net['BSSID']]['info'].set_markup( + "" + f"{info}" + f"{net['security']}\n" + f"{max_bitrate}\n" + f"{net['frequency']} Ghz {chan} {net['signal_level']} %\n" + f"{net['BSSID']}" + "" + ) - info = '' + _("Hostname") + f': {os.uname().nodename}\n{ipv4}\n{ipv6}' - else: - self.labels['networks'][ssid]['name'].set_label(_("Hidden") if ssid.startswith("\x00") else f"{ssid}") - if "psk" in netinfo: - info = _("Password saved") - if "encryption" in netinfo and netinfo['encryption'] != "off": - encr = netinfo['encryption'].upper() - if "frequency" in netinfo: - freq = "2.4 GHz" if netinfo['frequency'][:1] == "2" else "5 Ghz" - if "channel" in netinfo: - chan = _("Channel") + f' {netinfo["channel"]}' - if "signal_level_dBm" in netinfo: - unit = "%" if self.use_network_manager else _("dBm") - lvl = f"{netinfo['signal_level_dBm']} {unit}" - icon = self.signal_strength(int(netinfo["signal_level_dBm"])) - if 'icon' not in self.labels['networks'][ssid]: - self.labels['networks'][ssid]['row'].add(icon) - self.labels['networks'][ssid]['row'].reorder_child(icon, 0) - self.labels['networks'][ssid]['icon'] = icon - self.labels['networks'][ssid]['icon'] = icon - - self.labels['networks'][ssid]['info'].set_markup(f"{info}\n{encr} {freq} {chan} {lvl}") - self.labels['networks'][ssid]['row'].show_all() - - def signal_strength(self, signal_level): + def get_signal_strength_icon(self, signal_level): # networkmanager uses percentage not dbm - # the bars of nmcli are aligned near this breakpoints - exc = 77 if self.use_network_manager else -50 - good = 60 if self.use_network_manager else -60 - fair = 35 if self.use_network_manager else -70 - if signal_level > exc: - return self._gtk.Image('wifi_excellent') - elif signal_level > good: - return self._gtk.Image('wifi_good') - elif signal_level > fair: - return self._gtk.Image('wifi_fair') + if signal_level > 75: + return self.wifi_signal_icons['excellent'] + elif signal_level > 60: + return self.wifi_signal_icons['good'] + elif signal_level > 30: + return self.wifi_signal_icons['fair'] else: - return self._gtk.Image('wifi_weak') + return self.wifi_signal_icons['weak'] def update_single_network_info(self): - ifadd = netifaces.ifaddresses(self.interface) - ipv6 = f"{ifadd[netifaces.AF_INET6][0]['addr'].split('%')[0]}" if ifadd.get(netifaces.AF_INET6) else "" - if netifaces.AF_INET in ifadd and ifadd[netifaces.AF_INET]: - ipv4 = f"{ifadd[netifaces.AF_INET][0]['addr']} " - self.labels['ip'].set_text(f"IP: {ifadd[netifaces.AF_INET][0]['addr']} ") - else: - ipv4 = "" self.labels['networkinfo'].set_markup( f'{self.interface}\n\n' + '' + _("Hostname") + f': {os.uname().nodename}\n' - f'IPv4: {ipv4}\n' - f'IPv6: {ipv6}' + f'IPv4: {self.sdbus_nm.get_ip_address()}\n' ) self.labels['networkinfo'].show_all() return True def reload_networks(self, widget=None): - self.networks = {} - self.labels['networklist'].remove_column(0) - if self.wifi is not None and self.wifi.initialized: + self.deactivate() + del self.network_rows + self.network_rows = {} + for child in self.network_list.get_children(): + self.network_list.remove(child) + if self.sdbus_nm is not None and self.sdbus_nm.wifi: if widget: self._gtk.Button_busy(widget, True) - self.wifi.rescan() - GLib.idle_add(self.load_networks, widget) + self.sdbus_nm.rescan() + self.load_networks() + self.activate() def activate(self): - if self.initialized: - self.reload_networks() - if self.update_timeout is None: - if self.wifi is not None and self.wifi.initialized: - self.update_timeout = GLib.timeout_add_seconds(5, self.update_all_networks) - else: - self.update_timeout = GLib.timeout_add_seconds(5, self.update_single_network_info) + if self.update_timeout is None: + if self.sdbus_nm is not None and self.sdbus_nm.wifi: + if self.reload_button.get_sensitive(): + self._gtk.Button_busy(self.reload_button, True) + self.sdbus_nm.rescan() + self.load_networks() + self.update_timeout = GLib.timeout_add_seconds(5, self.update_all_networks) + else: + self.update_timeout = GLib.timeout_add_seconds(5, self.update_single_network_info) def deactivate(self): if self.update_timeout is not None: GLib.source_remove(self.update_timeout) self.update_timeout = None + + def toggle_wifi(self, switch, gparams): + enable = switch.get_active() + if enable: + self.reload_button.show() + else: + self.reload_button.hide() + logging.info(f"WiFi {enable}") + self.sdbus_nm.toggle_wifi(enable) diff --git a/scripts/KlipperScreen-requirements.txt b/scripts/KlipperScreen-requirements.txt index e19cd0d4..5dae8a30 100644 --- a/scripts/KlipperScreen-requirements.txt +++ b/scripts/KlipperScreen-requirements.txt @@ -1,8 +1,7 @@ jinja2==3.1.4;python_version>="3.6" requests==2.31.0;python_version>="3.7" -netifaces==0.11.0 -six==1.16.0 -dbus-python==1.3.2 +sdbus==0.11.1;python_version>="3.8" +sdbus_networkmanager==2.0.0;python_version>="3.8" # libmpv-dev 0.33 is required for 1.0 python-mpv==0.5.2;python_version<"3.10"