network: improvements in error handling

This commit is contained in:
Alfredo Monclus 2024-05-25 23:56:07 -03:00
parent 4bea02db4e
commit 6815e9cabc
4 changed files with 110 additions and 27 deletions

View File

@ -1,7 +1,7 @@
# This is the backend of the UI panel that communicates to sdbus-networkmanager
# TODO device selection/swtichability
# Alfredo Monclus (alfrix) 2024
import subprocess
import logging
from sdbus_block.networkmanager import (
@ -80,7 +80,9 @@ def WifiChannels(freq: str):
class SdbusNm:
def __init__(self):
def __init__(self, popup_callback):
self.insufficient_privileges = False
self.ensure_nm_running()
self.system_bus = sd_bus_open_system() # We need system bus
if self.system_bus is None:
return None
@ -92,6 +94,22 @@ class SdbusNm:
else:
self.wlan_device = None
self.wifi = False
self.popup = popup_callback
def ensure_nm_running(self):
# Check if NetworkManager is running
try:
status = subprocess.run(['systemctl', 'is-active', '--quiet', 'NetworkManager'])
if status.returncode != 0:
logging.info("Starting NetworkManager service...")
subprocess.run(['sudo', 'systemctl', 'start', 'NetworkManager'])
subprocess.run(['sudo', 'systemctl', 'enable', 'NetworkManager'])
status = subprocess.run(['systemctl', 'is-active', '--quiet', 'NetworkManager'])
if status.returncode != 0:
raise RuntimeError("Failed to start NetworkManager service")
except FileNotFoundError as e:
logging.error("NetworkManager might not be installed")
raise RuntimeError(f"{e}\n" "it might not be installed?\n") from e
def is_wifi_enabled(self):
return self.nm.wireless_enabled
@ -112,10 +130,14 @@ class SdbusNm:
# Nothing connected
if self.wlan_device:
return self.wlan_device.interface
if len(self.get_interfaces()) > 1:
# skips the loopback device
return self.get_interfaces()[1]
return None
return next(
(
interface for interface in self.get_interfaces()
if interface != 'lo'
),
None
)
gateway = ActiveConnection(self.nm.primary_connection).devices[0]
return NetworkDeviceGeneric(gateway).interface
@ -184,9 +206,18 @@ class SdbusNm:
return self.get_connected_ap().hw_address if self.get_connected_ap() is not None else None
def add_network(self, ssid, psk):
if existing_network := NetworkManagerSettings().get_connections_by_id(ssid):
for network in existing_network:
self.delete_connection_path(network)
if self.insufficient_privileges:
return {"error": "insufficient_privileges", "message": _("Insufficient privileges")}
existing_networks = NetworkManagerSettings().list_connections()
for connection_path in existing_networks:
connection_settings = NetworkConnectionSettings(connection_path).get_settings()
if (
connection_settings.get('802-11-wireless') and
connection_settings['802-11-wireless'].get('ssid') and
connection_settings['802-11-wireless']['ssid'][1].decode() == ssid
):
self.delete_connection_path(connection_path)
properties: NetworkManagerConnectionProperties = {
"connection": {
@ -197,21 +228,27 @@ class SdbusNm:
},
"802-11-wireless": {
"mode": ("s", "infrastructure"),
"security": ("s", "802-11-wireless-security"),
"ssid": ("ay", ssid.encode("utf-8")),
},
"802-11-wireless-security": {
"key-mgmt": ("s", "wpa-psk"),
"auth-alg": ("s", "open"),
"psk": ("s", psk),
},
"ipv4": {"method": ("s", "auto")},
"ipv6": {"method": ("s", "auto")},
}
if psk:
properties["802-11-wireless"]["security"] = ("s", "802-11-wireless-security")
properties["802-11-wireless-security"] = {
"key-mgmt": ("s", "wpa-psk"),
"auth-alg": ("s", "open"),
"psk": ("s", psk),
}
try:
NetworkManagerSettings().add_connection(properties)
return {"status": "success"}
except exceptions.NmSettingsPermissionDeniedError:
self.insufficient_privileges = True
logging.exception("Insufficient privileges")
return {"error": "insufficient_privileges", "message": _("Insufficient privileges")}
except exceptions.NmConnectionInvalidPropertyError:
logging.exception("Invalid property")
return {"error": "psk_invalid", "message": _("Invalid password")}
@ -235,10 +272,43 @@ class SdbusNm:
return self.wlan_device.request_scan({})
def connect(self, ssid):
connection = NetworkManagerSettings().get_connections_by_id(ssid)
if connection:
self.nm.activate_connection(connection[0])
return connection
connections = NetworkManagerSettings().list_connections()
target_connection = None
for connection_path in connections:
connection_settings = NetworkConnectionSettings(connection_path).get_settings()
if (
connection_settings.get('802-11-wireless') and
connection_settings['802-11-wireless'].get('ssid') and
connection_settings['802-11-wireless']['ssid'][1].decode() == ssid
):
target_connection = connection_path
break
if target_connection:
self.popup(f"{ssid}\n{_('Starting WiFi Association')}", 1)
try:
active_connection = self.nm.activate_connection(target_connection)
self.monitor_connection_status(active_connection)
return target_connection
except Exception as e:
logging.exception("Unexpected error")
self.popup(f"Unexpected error: {e}")
else:
self.popup(f"SSID '{ssid}' not found among saved connections")
def toggle_wifi(self, enable):
self.nm.wireless_enabled = enable
def monitor_connection_status(self, active_connection):
def on_state_changed(active_conn_path, state, reason):
if active_conn_path == active_connection:
if state == enums.NM_ACTIVE_CONNECTION_STATE_ACTIVATED:
self.popup(_("Connection established successfully"))
elif state in [
enums.NM_ACTIVE_CONNECTION_STATE_DEACTIVATING,
enums.NM_ACTIVE_CONNECTION_STATE_DEACTIVATED
]:
self.popup(_("Connection disconnected"))
elif state == enums.NM_ACTIVE_CONNECTION_STATE_FAILED:
self.popup(_("Connection failed"))

View File

@ -18,7 +18,7 @@ class Panel(ScreenPanel):
self.network_rows = {}
self.networks = {}
try:
self.sdbus_nm = SdbusNm()
self.sdbus_nm = SdbusNm(self.popup_callback)
except Exception as e:
logging.exception("Failed to initialize")
self.sdbus_nm = None
@ -208,7 +208,14 @@ class Panel(ScreenPanel):
def connect_network(self, widget, ssid, showadd=True):
self.deactivate()
if showadd and not self.sdbus_nm.is_known(ssid) and not self.sdbus_nm.is_open(ssid):
if self.sdbus_nm.is_open(ssid):
result = self.sdbus_nm.add_network(ssid, '')
if "error" in result:
self._screen.show_popup_message(result["message"])
else:
self.connect_network(widget, ssid, showadd=False)
return
if showadd and not self.sdbus_nm.is_known(ssid):
self.show_add_network(widget, ssid)
self.activate()
return

View File

@ -343,8 +343,8 @@ class KlipperScreen(Gtk.Window):
self.notification_log.append(log_entry)
self.process_update("notify_log", log_entry)
def show_popup_message(self, message, level=3):
if (datetime.now() - self.last_popup_time).seconds < 1:
def show_popup_message(self, message, level=3, from_ws=False):
if (datetime.now() - self.last_popup_time).seconds < 1 and from_ws:
return
self.last_popup_time = datetime.now()
self.close_screensaver()
@ -794,7 +794,7 @@ class KlipperScreen(Gtk.Window):
elif action == "notify_update_response":
if 'message' in data and 'Error' in data['message']:
logging.error(f"{action}:{data['message']}")
self.show_popup_message(data['message'], 3)
self.show_popup_message(data['message'], 3, from_ws=True)
if "KlipperScreen" in data['message']:
self.restart_ks()
elif action == "notify_power_changed":
@ -813,12 +813,12 @@ class KlipperScreen(Gtk.Window):
return
self.prompt.decode(action)
elif data.startswith("echo: "):
self.show_popup_message(data[6:], 1)
self.show_popup_message(data[6:], 1, from_ws=True)
elif data.startswith("!! "):
self.show_popup_message(data[3:], 3)
self.show_popup_message(data[3:], 3, from_ws=True)
elif "unknown" in data.lower() and \
not ("TESTZ" in data or "MEASURE_AXES_NOISE" in data or "ACCELEROMETER_QUERY" in data):
self.show_popup_message(data)
self.show_popup_message(data, from_ws=True)
elif "SAVE_CONFIG" in data and self.printer.state == "ready":
script = {"script": "SAVE_CONFIG"}
self._confirm_send_action(

View File

@ -202,6 +202,12 @@ create_policy()
KS_GID=$( getent group klipperscreen | awk -F: '{printf "%d", $3}' )
sudo tee ${RULE_FILE} > /dev/null << EOF
// Allow KlipperScreen to reboot, shutdown, etc
polkit.addRule(function(action, subject) {
if (action.id == "org.freedesktop.NetworkManager.settings.modify.system" &&
subject.isInGroup("network")) {
return polkit.Result.YES;
}
});
polkit.addRule(function(action, subject) {
if ((action.id == "org.freedesktop.login1.power-off" ||
action.id == "org.freedesktop.login1.power-off-multiple-sessions" ||