2024-01-11 20:54:10 -03:00

273 lines
11 KiB
Python

# Network in KlipperScreen is a connection in NetworkManager
# Interface in KlipperScreen is a device in NetworkManager
import logging
import uuid
import dbus
import gi
gi.require_version('Gdk', '3.0')
from gi.repository import GLib
from contextlib import suppress
from ks_includes.wifi import WifiChannels
from ks_includes import NetworkManager
from dbus.mainloop.glib import DBusGMainLoop
class WifiManager:
networks_in_supplicant = []
def __init__(self, interface_name, *args, **kwargs):
super().__init__(*args, **kwargs)
DBusGMainLoop(set_as_default=True)
self._callbacks = {
"connected": [],
"connecting_status": [],
"scan_results": [],
"popup": [],
}
self.connected = False
self.connected_ssid = None
self.interface_name = interface_name
self.known_networks = {} # List of known connections
self.visible_networks = {} # List of visible access points
self.ssid_by_path = {}
self.path_by_ssid = {}
self.hidden_ssid_index = 0
self.wifi_dev = NetworkManager.NetworkManager.GetDeviceByIpIface(interface_name)
self.wifi_dev.OnAccessPointAdded(self._ap_added)
self.wifi_dev.OnAccessPointRemoved(self._ap_removed)
self.wifi_dev.OnStateChanged(self._ap_state_changed)
for ap in self.wifi_dev.GetAccessPoints():
self._add_ap(ap)
self._update_known_connections()
self.initialized = True
def _update_known_connections(self):
self.known_networks = {}
for con in NetworkManager.Settings.ListConnections():
settings = con.GetSettings()
if "802-11-wireless" in settings:
ssid = settings["802-11-wireless"]['ssid']
self.known_networks[ssid] = con
def _ap_added(self, nm, interface, signal, access_point):
with suppress(NetworkManager.ObjectVanished):
access_point.OnPropertiesChanged(self._ap_prop_changed)
ssid = self._add_ap(access_point)
for cb in self._callbacks['scan_results']:
args = (cb, [ssid], [])
GLib.idle_add(*args)
def _ap_removed(self, dev, interface, signal, access_point):
path = access_point.object_path
if path in self.ssid_by_path:
ssid = self.ssid_by_path[path]
self._remove_ap(path)
for cb in self._callbacks['scan_results']:
args = (cb, [ssid], [])
GLib.idle_add(*args)
def _ap_state_changed(self, nm, interface, signal, old_state, new_state, reason):
msg = ""
if new_state in (NetworkManager.NM_DEVICE_STATE_UNKNOWN, NetworkManager.NM_DEVICE_STATE_REASON_UNKNOWN):
msg = "State is unknown"
elif new_state == NetworkManager.NM_DEVICE_STATE_UNMANAGED:
msg = "Error: Not managed by NetworkManager"
elif new_state == NetworkManager.NM_DEVICE_STATE_UNAVAILABLE:
msg = "Error: Not available for use:\nReasons may include the wireless switched off, missing firmware, etc."
elif new_state == NetworkManager.NM_DEVICE_STATE_DISCONNECTED:
msg = "Currently disconnected"
elif new_state == NetworkManager.NM_DEVICE_STATE_PREPARE:
msg = "Preparing the connection to the network"
elif new_state == NetworkManager.NM_DEVICE_STATE_CONFIG:
msg = "Connecting to the requested network..."
elif new_state == NetworkManager.NM_DEVICE_STATE_NEED_AUTH:
msg = "Authorizing"
elif new_state == NetworkManager.NM_DEVICE_STATE_IP_CONFIG:
msg = "Requesting IP addresses and routing information"
elif new_state == NetworkManager.NM_DEVICE_STATE_IP_CHECK:
msg = "Checking whether further action is required for the requested network connection"
elif new_state == NetworkManager.NM_DEVICE_STATE_SECONDARIES:
msg = "Waiting for a secondary connection (like a VPN)"
elif new_state == NetworkManager.NM_DEVICE_STATE_ACTIVATED:
msg = "Connected"
elif new_state == NetworkManager.NM_DEVICE_STATE_DEACTIVATING:
msg = "A disconnection from the current network connection was requested"
elif new_state == NetworkManager.NM_DEVICE_STATE_FAILED:
msg = "Failed to connect to the requested network"
self.callback("popup", msg)
elif new_state == NetworkManager.NM_DEVICE_STATE_REASON_DEPENDENCY_FAILED:
msg = "A dependency of the connection failed"
elif new_state == NetworkManager.NM_DEVICE_STATE_REASON_CARRIER:
msg = ""
else:
logging.info(f"State {new_state}")
if msg != "":
self.callback("connecting_status", msg)
if new_state == NetworkManager.NM_DEVICE_STATE_ACTIVATED:
self.connected = True
for cb in self._callbacks['connected']:
args = (cb, self.get_connected_ssid(), None)
GLib.idle_add(*args)
else:
self.connected = False
def _ap_prop_changed(self, ap, interface, signal, properties):
pass
def _add_ap(self, ap):
ssid = ap.Ssid
if ssid == "":
ssid = _("Hidden") + f" {self.hidden_ssid_index}"
self.hidden_ssid_index += 1
self.ssid_by_path[ap.object_path] = ssid
self.path_by_ssid[ssid] = ap.object_path
self.visible_networks[ap.object_path] = ap
return ssid
def _remove_ap(self, path):
self.ssid_by_path.pop(path, None)
self.visible_networks.pop(path, None)
def add_callback(self, name, callback):
if name in self._callbacks and callback not in self._callbacks[name]:
self._callbacks[name].append(callback)
def callback(self, cb_type, msg):
if cb_type in self._callbacks:
for cb in self._callbacks[cb_type]:
GLib.idle_add(cb, msg)
def add_network(self, ssid, psk):
aps = self._visible_networks_by_ssid()
if ssid in aps:
ap = aps[ssid]
new_connection = {
'802-11-wireless': {
'mode': 'infrastructure',
'security': '802-11-wireless-security',
'ssid': ssid
},
'802-11-wireless-security': {
'auth-alg': 'open',
'key-mgmt': 'wpa-psk',
'psk': psk
},
'connection': {
'id': ssid,
'type': '802-11-wireless',
'uuid': str(uuid.uuid4())
},
'ipv4': {
'method': 'auto'
},
'ipv6': {
'method': 'auto'
}
}
try:
NetworkManager.Settings.AddConnection(new_connection)
except dbus.exceptions.DBusException as e:
msg = _("Invalid password") if "802-11-wireless-security.psk" in e else f"{e}"
self.callback("popup", msg)
logging.info(f"Error adding network {e}")
self._update_known_connections()
return True
def connect(self, ssid):
if ssid in self.known_networks:
conn = self.known_networks[ssid]
with suppress(NetworkManager.ObjectVanished):
msg = f"Connecting to: {ssid}"
logging.info(msg)
self.callback("connecting_status", msg)
NetworkManager.NetworkManager.ActivateConnection(conn, self.wifi_dev, "/")
def delete_network(self, ssid):
for ssid in self.known_networks:
con = self.known_networks[ssid]
if con.GetSettings()['connection']['id'] == ssid:
con.Delete()
self._update_known_connections()
def get_connected_ssid(self):
if self.wifi_dev.SpecificDevice().ActiveAccessPoint:
return self.wifi_dev.SpecificDevice().ActiveAccessPoint.Ssid
return None
def _get_connected_ap(self):
return self.wifi_dev.SpecificDevice().ActiveAccessPoint
def _visible_networks_by_ssid(self):
aps = self.wifi_dev.GetAccessPoints()
ret = {}
for ap in aps:
with suppress(NetworkManager.ObjectVanished):
ret[ap.Ssid] = ap
return ret
def get_network_info(self, ssid):
netinfo = {}
if ssid in self.known_networks:
con = self.known_networks[ssid]
with suppress(NetworkManager.ObjectVanished):
settings = con.GetSettings()
if settings and '802-11-wireless' in settings:
netinfo.update({
"ssid": settings['802-11-wireless']['ssid'],
"connected": self.get_connected_ssid() == ssid
})
path = self.path_by_ssid[ssid]
aps = self.visible_networks
if path in aps:
ap = aps[path]
with suppress(NetworkManager.ObjectVanished):
netinfo.update({
"mac": ap.HwAddress,
"channel": WifiChannels.lookup(ap.Frequency)[1],
"configured": ssid in self.known_networks,
"frequency": str(ap.Frequency),
"flags": ap.Flags,
"ssid": ssid,
"connected": self._get_connected_ap() == ap,
"encryption": self._get_encryption(ap.RsnFlags),
"signal_level_dBm": str(ap.Strength)
})
return netinfo
@staticmethod
def _get_encryption(flags):
encryption = ""
if (flags & NetworkManager.NM_802_11_AP_SEC_PAIR_WEP40 or
flags & NetworkManager.NM_802_11_AP_SEC_PAIR_WEP104 or
flags & NetworkManager.NM_802_11_AP_SEC_GROUP_WEP40 or
flags & NetworkManager.NM_802_11_AP_SEC_GROUP_WEP104):
encryption += "WEP "
if (flags & NetworkManager.NM_802_11_AP_SEC_PAIR_TKIP or
flags & NetworkManager.NM_802_11_AP_SEC_GROUP_TKIP):
encryption += "TKIP "
if (flags & NetworkManager.NM_802_11_AP_SEC_PAIR_CCMP or
flags & NetworkManager.NM_802_11_AP_SEC_GROUP_CCMP):
encryption += "AES "
if flags & NetworkManager.NM_802_11_AP_SEC_KEY_MGMT_PSK:
encryption += "WPA-PSK "
if flags & NetworkManager.NM_802_11_AP_SEC_KEY_MGMT_802_1X:
encryption += "802.1x "
return encryption.strip()
def get_networks(self):
return list(set(list(self.known_networks.keys()) + list(self.ssid_by_path.values())))
def get_supplicant_networks(self):
return {ssid: {"ssid": ssid} for ssid in self.known_networks.keys()}
def rescan(self):
try:
self.wifi_dev.RequestScan({})
except dbus.exceptions.DBusException as e:
logging.error(f"Error during rescan {e}")