Merge branch 'develop'
This commit is contained in:
commit
eeed801645
@ -4,6 +4,7 @@
|
||||
import subprocess
|
||||
import logging
|
||||
|
||||
import sdbus
|
||||
from sdbus_block.networkmanager import (
|
||||
NetworkManager,
|
||||
NetworkDeviceGeneric,
|
||||
@ -17,7 +18,6 @@ from sdbus_block.networkmanager import (
|
||||
enums,
|
||||
exceptions,
|
||||
)
|
||||
from sdbus import sd_bus_open_system, set_default_bus
|
||||
from gi.repository import GLib
|
||||
from uuid import uuid4
|
||||
|
||||
@ -91,10 +91,10 @@ class SdbusNm:
|
||||
|
||||
def __init__(self, popup_callback):
|
||||
self.ensure_nm_running()
|
||||
self.system_bus = sd_bus_open_system() # We need system bus
|
||||
self.system_bus = sdbus.sd_bus_open_system() # We need system bus
|
||||
if self.system_bus is None:
|
||||
return None
|
||||
set_default_bus(self.system_bus)
|
||||
sdbus.set_default_bus(self.system_bus)
|
||||
self.nm = NetworkManager()
|
||||
self.wlan_device = (
|
||||
self.get_wireless_interfaces()[0]
|
||||
@ -163,9 +163,6 @@ class SdbusNm:
|
||||
def is_known(self, ssid):
|
||||
return any(net["SSID"] == ssid for net in self.get_known_networks())
|
||||
|
||||
def is_open(self, ssid):
|
||||
return self.get_security_type(ssid) == "Open"
|
||||
|
||||
def get_ip_address(self):
|
||||
active_connection_path = self.nm.primary_connection
|
||||
if not active_connection_path or active_connection_path == "/":
|
||||
@ -176,26 +173,29 @@ class SdbusNm:
|
||||
|
||||
def get_networks(self):
|
||||
networks = []
|
||||
if self.wlan_device:
|
||||
all_aps = [AccessPoint(result) for result in self.wlan_device.access_points]
|
||||
networks.extend(
|
||||
{
|
||||
"SSID": ap.ssid.decode("utf-8"),
|
||||
"known": self.is_known(ap.ssid.decode("utf-8")),
|
||||
"security": get_encryption(
|
||||
ap.rsn_flags or ap.wpa_flags or ap.flags
|
||||
),
|
||||
"frequency": WifiChannels(ap.frequency)[0],
|
||||
"channel": WifiChannels(ap.frequency)[1],
|
||||
"signal_level": ap.strength,
|
||||
"max_bitrate": ap.max_bitrate,
|
||||
"BSSID": ap.hw_address,
|
||||
}
|
||||
for ap in all_aps
|
||||
if ap.ssid
|
||||
)
|
||||
return sorted(networks, key=lambda i: i["signal_level"], reverse=True)
|
||||
return networks
|
||||
try:
|
||||
if self.wlan_device:
|
||||
all_aps = [AccessPoint(result) for result in self.wlan_device.access_points]
|
||||
networks.extend(
|
||||
{
|
||||
"SSID": ap.ssid.decode("utf-8"),
|
||||
"known": self.is_known(ap.ssid.decode("utf-8")),
|
||||
"security": get_encryption(
|
||||
ap.rsn_flags or ap.wpa_flags or ap.flags
|
||||
),
|
||||
"frequency": WifiChannels(ap.frequency)[0],
|
||||
"channel": WifiChannels(ap.frequency)[1],
|
||||
"signal_level": ap.strength,
|
||||
"max_bitrate": ap.max_bitrate,
|
||||
"BSSID": ap.hw_address,
|
||||
}
|
||||
for ap in all_aps
|
||||
if ap.ssid
|
||||
)
|
||||
return sorted(networks, key=lambda i: i["signal_level"], reverse=True)
|
||||
return networks
|
||||
except Exception as e:
|
||||
return networks
|
||||
|
||||
def get_bssid_from_ssid(self, ssid):
|
||||
return next(net["BSSID"] for net in self.get_networks() if ssid == net["SSID"])
|
||||
@ -222,8 +222,9 @@ class SdbusNm:
|
||||
None,
|
||||
)
|
||||
|
||||
def add_network(self, ssid, psk):
|
||||
def add_network(self, ssid, psk, eap_method, identity="", phase2=None):
|
||||
security_type = self.get_security_type(ssid)
|
||||
logging.debug(f"Adding network of type: {security_type}")
|
||||
if security_type is None:
|
||||
return {"error": "network_not_found", "message": _("Network not found")}
|
||||
|
||||
@ -240,17 +241,17 @@ class SdbusNm:
|
||||
"802-11-wireless": {
|
||||
"mode": ("s", "infrastructure"),
|
||||
"ssid": ("ay", ssid.encode("utf-8")),
|
||||
"security": ("s", "802-11-wireless-security"),
|
||||
},
|
||||
"ipv4": {"method": ("s", "auto")},
|
||||
"ipv6": {"method": ("s", "auto")},
|
||||
}
|
||||
|
||||
if security_type != "Open":
|
||||
properties["802-11-wireless"]["security"] = (
|
||||
"s",
|
||||
"802-11-wireless-security",
|
||||
)
|
||||
if "WPA-PSK" in security_type:
|
||||
if security_type == "Open":
|
||||
properties["802-11-wireless-security"] = {
|
||||
"key-mgmt": ("s", "none"),
|
||||
}
|
||||
elif "WPA-PSK" in security_type:
|
||||
properties["802-11-wireless-security"] = {
|
||||
"key-mgmt": ("s", "wpa-psk"),
|
||||
"psk": ("s", psk),
|
||||
@ -268,15 +269,16 @@ class SdbusNm:
|
||||
elif "OWE" in security_type:
|
||||
properties["802-11-wireless-security"] = {
|
||||
"key-mgmt": ("s", "owe"),
|
||||
"psk": ("s", psk),
|
||||
}
|
||||
elif "802.1x" in security_type:
|
||||
properties["802-11-wireless-security"] = {
|
||||
"key-mgmt": ("s", "ieee8021x"),
|
||||
"wep-key-type": ("u", 2),
|
||||
"wep-key0": ("s", psk),
|
||||
"auth-alg": ("s", "shared"),
|
||||
"key-mgmt": ("s", "wpa-eap"),
|
||||
"eap": ("as", [eap_method]),
|
||||
"identity": ("s", identity),
|
||||
"password": ("s", psk),
|
||||
}
|
||||
if phase2:
|
||||
properties["802-11-wireless-security"]["phase2_auth"] = ("s", phase2)
|
||||
elif "WEP" in security_type:
|
||||
properties["802-11-wireless-security"] = {
|
||||
"key-mgmt": ("s", "none"),
|
||||
@ -327,7 +329,10 @@ class SdbusNm:
|
||||
}
|
||||
|
||||
def rescan(self):
|
||||
return self.wlan_device.request_scan({})
|
||||
try:
|
||||
return self.wlan_device.request_scan({})
|
||||
except Exception as e:
|
||||
self.popup(f"Unexpected error: {e}")
|
||||
|
||||
def get_connection_path_by_ssid(self, ssid):
|
||||
existing_networks = NetworkManagerSettings().list_connections()
|
||||
@ -345,7 +350,8 @@ class SdbusNm:
|
||||
|
||||
def connect(self, ssid):
|
||||
if target_connection := self.get_connection_path_by_ssid(ssid):
|
||||
self.popup(f"{ssid}\n{_('Starting WiFi Association')}", 1)
|
||||
message = ssid + "\n" + _("Starting WiFi Association")
|
||||
self.popup(message, 1)
|
||||
try:
|
||||
active_connection = self.nm.activate_connection(target_connection)
|
||||
return target_connection
|
||||
|
@ -43,6 +43,10 @@ class Panel(ScreenPanel):
|
||||
self._screen.panels_reinit.append(self._screen._cur_panels[-1])
|
||||
return
|
||||
self.update_timeout = None
|
||||
self.conn_status = None
|
||||
self.init_status = False
|
||||
self.reload = False
|
||||
self.last_ap_bssid = ''
|
||||
self.network_list = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, hexpand=True, vexpand=True)
|
||||
self.network_rows = {}
|
||||
self.networks = {}
|
||||
@ -109,8 +113,6 @@ class Panel(ScreenPanel):
|
||||
self._screen.show_popup_message(msg, level)
|
||||
|
||||
def load_networks(self):
|
||||
for net in self.sdbus_nm.get_networks():
|
||||
self.add_network(net['BSSID'])
|
||||
GLib.timeout_add_seconds(10, self._gtk.Button_busy, self.reload_button, False)
|
||||
self.content.show_all()
|
||||
return False
|
||||
@ -119,7 +121,13 @@ class Panel(ScreenPanel):
|
||||
if bssid in self.network_rows:
|
||||
return
|
||||
|
||||
net = next(net for net in self.sdbus_nm.get_networks() if bssid == net['BSSID'])
|
||||
networks = self.sdbus_nm.get_networks()
|
||||
if networks:
|
||||
net = next((net for net in networks if bssid == net['BSSID']), None)
|
||||
if net is None:
|
||||
self.remove_network_from_list(bssid)
|
||||
return
|
||||
|
||||
ssid = net['SSID']
|
||||
|
||||
connect = self._gtk.Button("load", None, "color3", self.bts)
|
||||
@ -144,11 +152,9 @@ class Panel(ScreenPanel):
|
||||
buttons.add(delete)
|
||||
buttons.add(connect)
|
||||
|
||||
info = Gtk.Label(halign=Gtk.Align.START)
|
||||
labels = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, vexpand=True,
|
||||
halign=Gtk.Align.START, valign=Gtk.Align.CENTER)
|
||||
labels.add(name)
|
||||
labels.add(info)
|
||||
icon = self._gtk.Image()
|
||||
|
||||
self.network_rows[bssid] = Gtk.Box(spacing=5, hexpand=True, vexpand=False)
|
||||
@ -161,7 +167,6 @@ class Panel(ScreenPanel):
|
||||
"connect": connect,
|
||||
"delete": delete,
|
||||
"icon": icon,
|
||||
"info": info,
|
||||
"name": name,
|
||||
"row": self.network_rows[bssid],
|
||||
}
|
||||
@ -195,7 +200,13 @@ class Panel(ScreenPanel):
|
||||
|
||||
def add_new_network(self, widget, ssid):
|
||||
self._screen.remove_keyboard()
|
||||
result = self.sdbus_nm.add_network(ssid, self.labels['network_psk'].get_text())
|
||||
psk = self.labels['network_psk'].get_text()
|
||||
identity = self.labels['network_identity'].get_text()
|
||||
eap_method = self.get_dropdown_value(self.labels['network_eap_method'])
|
||||
phase2 = self.get_dropdown_value(self.labels['network_phase2'])
|
||||
logging.debug(f"{phase2=}")
|
||||
logging.debug(f"{eap_method=}")
|
||||
result = self.sdbus_nm.add_network(ssid, psk, eap_method, identity, phase2)
|
||||
if "error" in result:
|
||||
self._screen.show_popup_message(result["message"])
|
||||
if result["error"] == "psk_invalid":
|
||||
@ -204,6 +215,12 @@ class Panel(ScreenPanel):
|
||||
self.connect_network(widget, ssid, showadd=False)
|
||||
self.close_add_network()
|
||||
|
||||
def get_dropdown_value(self, dropdown, default=None):
|
||||
tree_iter = dropdown.get_active_iter()
|
||||
model = dropdown.get_model()
|
||||
result = model[tree_iter][0]
|
||||
return result if result != "disabled" else None
|
||||
|
||||
def back(self):
|
||||
if self.show_add:
|
||||
self.close_add_network()
|
||||
@ -218,7 +235,7 @@ class Panel(ScreenPanel):
|
||||
self.content.remove(child)
|
||||
self.content.add(self.labels['main_box'])
|
||||
self.content.show()
|
||||
for i in ['add_network', 'network_psk']:
|
||||
for i in ['add_network', 'network_psk', 'network_identity']:
|
||||
if i in self.labels:
|
||||
del self.labels[i]
|
||||
self.show_add = False
|
||||
@ -226,7 +243,8 @@ class Panel(ScreenPanel):
|
||||
def connect_network(self, widget, ssid, showadd=True):
|
||||
self.deactivate()
|
||||
if showadd and not self.sdbus_nm.is_known(ssid):
|
||||
if self.sdbus_nm.is_open(ssid):
|
||||
sec_type = self.sdbus_nm.get_security_type(ssid)
|
||||
if sec_type == "Open" or "OWE" in sec_type:
|
||||
logging.debug("Network is Open do not show psk")
|
||||
result = self.sdbus_nm.add_network(ssid, '')
|
||||
if "error" in result:
|
||||
@ -240,6 +258,9 @@ class Panel(ScreenPanel):
|
||||
self.remove_network_from_list(bssid)
|
||||
self.sdbus_nm.connect(ssid)
|
||||
self.reload_networks()
|
||||
if self.conn_status is None:
|
||||
self.sdbus_nm.enable_monitoring(True)
|
||||
self.conn_status = GLib.timeout_add(500, self.sdbus_nm.monitor_connection_status)
|
||||
|
||||
def remove_network_from_list(self, bssid):
|
||||
if bssid not in self.network_rows:
|
||||
@ -260,7 +281,25 @@ class Panel(ScreenPanel):
|
||||
if "add_network" in self.labels:
|
||||
del self.labels['add_network']
|
||||
|
||||
label = Gtk.Label(label=_("PSK for") + f' {ssid}', hexpand=False)
|
||||
eap_method = Gtk.ComboBoxText(hexpand=True)
|
||||
for method in ("peap", "ttls", "pwd", "leap", "md5"):
|
||||
eap_method.append(method, method.upper())
|
||||
self.labels['network_eap_method'] = eap_method
|
||||
eap_method.set_active(0)
|
||||
|
||||
phase2 = Gtk.ComboBoxText(hexpand=True)
|
||||
for method in ("mschapv2", "gtc", "pap", "chap", "mschap", "disabled"):
|
||||
phase2.append(method, method.upper())
|
||||
self.labels['network_phase2'] = phase2
|
||||
phase2.set_active(0)
|
||||
|
||||
auth_selection_box = Gtk.Box(no_show_all=True)
|
||||
auth_selection_box.add(self.labels['network_eap_method'])
|
||||
auth_selection_box.add(self.labels['network_phase2'])
|
||||
|
||||
self.labels['network_identity'] = Gtk.Entry(hexpand=True, no_show_all=True)
|
||||
self.labels['network_identity'].connect("focus-in-event", self._screen.show_keyboard)
|
||||
|
||||
self.labels['network_psk'] = Gtk.Entry(hexpand=True)
|
||||
self.labels['network_psk'].connect("activate", self.add_new_network, ssid)
|
||||
self.labels['network_psk'].connect("focus-in-event", self._screen.show_keyboard)
|
||||
@ -269,21 +308,47 @@ class Panel(ScreenPanel):
|
||||
save.set_hexpand(False)
|
||||
save.connect("clicked", self.add_new_network, ssid)
|
||||
|
||||
box = Gtk.Box()
|
||||
box.pack_start(self.labels['network_psk'], True, True, 5)
|
||||
box.pack_start(save, False, False, 5)
|
||||
user_label = Gtk.Label(label=_("User"), hexpand=False, no_show_all=True)
|
||||
auth_grid = Gtk.Grid()
|
||||
auth_grid.attach(user_label, 0, 0, 1, 1)
|
||||
auth_grid.attach(self.labels['network_identity'], 1, 0, 1, 1)
|
||||
auth_grid.attach(Gtk.Label(label=_("Password"), hexpand=False), 0, 1, 1, 1)
|
||||
auth_grid.attach(self.labels['network_psk'], 1, 1, 1, 1)
|
||||
auth_grid.attach(save, 2, 0, 1, 2)
|
||||
|
||||
self.labels['add_network'] = Gtk.Box(orientation=Gtk.Orientation.VERTICAL, spacing=5, valign=Gtk.Align.CENTER,
|
||||
hexpand=True, vexpand=True)
|
||||
self.labels['add_network'].pack_start(label, True, True, 5)
|
||||
self.labels['add_network'].pack_start(box, True, True, 5)
|
||||
if "802.1x" in self.sdbus_nm.get_security_type(ssid):
|
||||
user_label.show()
|
||||
self.labels['network_eap_method'].show()
|
||||
self.labels['network_phase2'].show()
|
||||
self.labels['network_identity'].show()
|
||||
auth_selection_box.show()
|
||||
|
||||
self.content.add(self.labels['add_network'])
|
||||
self.labels['add_network'] = Gtk.Box(
|
||||
orientation=Gtk.Orientation.VERTICAL, spacing=5, valign=Gtk.Align.CENTER,
|
||||
hexpand=True, vexpand=True
|
||||
)
|
||||
self.labels['add_network'].add(Gtk.Label(label=_("Connecting to %s") % ssid))
|
||||
self.labels['add_network'].add(auth_selection_box)
|
||||
self.labels['add_network'].add(auth_grid)
|
||||
scroll = self._gtk.ScrolledWindow()
|
||||
scroll.add(self.labels['add_network'])
|
||||
self.content.add(scroll)
|
||||
self.labels['network_psk'].grab_focus_without_selecting()
|
||||
self.content.show_all()
|
||||
self.show_add = True
|
||||
|
||||
def update_all_networks(self):
|
||||
self.ap_bssid = self.sdbus_nm.get_connected_bssid()
|
||||
if self.last_ap_bssid != self.ap_bssid:
|
||||
self.remove_network_from_list(self.last_ap_bssid)
|
||||
self.remove_network_from_list(self.ap_bssid)
|
||||
self.last_ap_bssid = self.ap_bssid
|
||||
|
||||
if self.reload:
|
||||
for child in self.network_list.get_children():
|
||||
self.network_list.remove(child)
|
||||
self.reload = False
|
||||
|
||||
self.interface = self.sdbus_nm.get_primary_interface()
|
||||
self.labels['interface'].set_text(_("Interface") + f': {self.interface}')
|
||||
self.labels['ip'].set_text(f"IP: {self.sdbus_nm.get_ip_address()}")
|
||||
@ -297,8 +362,9 @@ class Panel(ScreenPanel):
|
||||
self.update_network_info(net)
|
||||
for i, net in enumerate(nets):
|
||||
for child in self.network_list.get_children():
|
||||
if child == self.network_rows[net['BSSID']]:
|
||||
self.network_list.reorder_child(child, i)
|
||||
if 'BSSID' in net and net['BSSID'] in self.network_rows:
|
||||
if child == self.network_rows[net['BSSID']]:
|
||||
self.network_list.reorder_child(child, i)
|
||||
self.network_list.show_all()
|
||||
return True
|
||||
|
||||
@ -306,19 +372,7 @@ class Panel(ScreenPanel):
|
||||
if net['BSSID'] not in self.network_rows.keys() or net['BSSID'] not in self.networks:
|
||||
logging.info(f"Unknown SSID {net['SSID']}")
|
||||
return
|
||||
info = _("Password saved") + '\n' if net['known'] else ""
|
||||
chan = _("Channel") + f' {net["channel"]}'
|
||||
max_bitrate = _("Max:") + f"{self.format_speed(net['max_bitrate'])}"
|
||||
self.networks[net['BSSID']]['icon'].set_from_pixbuf(self.get_signal_strength_icon(net["signal_level"]))
|
||||
self.networks[net['BSSID']]['info'].set_markup(
|
||||
"<small>"
|
||||
f"{info}"
|
||||
f"{net['security']}\n"
|
||||
f"{max_bitrate}\n"
|
||||
f"{net['frequency']} Ghz {chan} {net['signal_level']} %\n"
|
||||
f"{net['BSSID']}"
|
||||
"</small>"
|
||||
)
|
||||
|
||||
def get_signal_strength_icon(self, signal_level):
|
||||
# networkmanager uses percentage not dbm
|
||||
@ -344,25 +398,27 @@ class Panel(ScreenPanel):
|
||||
self.deactivate()
|
||||
del self.network_rows
|
||||
self.network_rows = {}
|
||||
for child in self.network_list.get_children():
|
||||
self.network_list.remove(child)
|
||||
if self.sdbus_nm is not None and self.sdbus_nm.wifi:
|
||||
if widget:
|
||||
self._gtk.Button_busy(widget, True)
|
||||
self.sdbus_nm.rescan()
|
||||
if not self.init_status:
|
||||
self.sdbus_nm.rescan()
|
||||
else:
|
||||
self.init_status = False
|
||||
self.load_networks()
|
||||
self.activate()
|
||||
self.reload = True
|
||||
|
||||
def activate(self):
|
||||
if self.sdbus_nm is None:
|
||||
return
|
||||
if self.update_timeout is None:
|
||||
if self.sdbus_nm.wifi:
|
||||
if self.reload_button.get_sensitive():
|
||||
self._gtk.Button_busy(self.reload_button, True)
|
||||
self.sdbus_nm.rescan()
|
||||
self.load_networks()
|
||||
self.update_all_networks()
|
||||
if self.sdbus_nm.is_wifi_enabled():
|
||||
if self.reload_button.get_sensitive():
|
||||
self._gtk.Button_busy(self.reload_button, True)
|
||||
self.sdbus_nm.rescan()
|
||||
self.load_networks()
|
||||
self.update_timeout = GLib.timeout_add_seconds(5, self.update_all_networks)
|
||||
else:
|
||||
self.update_single_network_info()
|
||||
@ -376,12 +432,19 @@ class Panel(ScreenPanel):
|
||||
self.update_timeout = None
|
||||
if self.sdbus_nm.wifi:
|
||||
self.sdbus_nm.enable_monitoring(False)
|
||||
if self.conn_status is not None:
|
||||
GLib.source_remove(self.conn_status)
|
||||
self.conn_status = None
|
||||
|
||||
def toggle_wifi(self, switch, gparams):
|
||||
enable = switch.get_active()
|
||||
if enable:
|
||||
self.reload_button.show()
|
||||
else:
|
||||
self.reload_button.hide()
|
||||
logging.info(f"WiFi {enable}")
|
||||
self.sdbus_nm.toggle_wifi(enable)
|
||||
if enable:
|
||||
self.reload_button.show()
|
||||
self.network_list.show()
|
||||
self.init_status = True
|
||||
self.reload_networks(self.reload_button)
|
||||
else:
|
||||
self.reload_button.hide()
|
||||
self.network_list.hide()
|
||||
|
@ -88,14 +88,15 @@ class Panel(ScreenPanel):
|
||||
self.hide_numpad()
|
||||
|
||||
def pid_calibrate(self, temp):
|
||||
heater = self.active_heater.split(' ', maxsplit=1)[-1]
|
||||
if self.verify_max_temp(temp):
|
||||
script = {
|
||||
"script": f"PID_CALIBRATE HEATER={self.active_heater} TARGET={temp}"
|
||||
"script": f"PID_CALIBRATE HEATER={heater} TARGET={temp}"
|
||||
}
|
||||
self._screen._confirm_send_action(
|
||||
None,
|
||||
_("Initiate a PID calibration for:")
|
||||
+ f" {self.active_heater} @ {temp} ºC"
|
||||
+ f" {heater} @ {temp} ºC"
|
||||
+ "\n\n"
|
||||
+ _("It may take more than 5 minutes depending on the heater power."),
|
||||
"printer.gcode.script",
|
||||
|
@ -375,14 +375,13 @@ class Panel(ScreenPanel):
|
||||
return max(temp, 0)
|
||||
|
||||
def pid_calibrate(self, temp):
|
||||
heater = self.active_heater.split(' ', maxsplit=1)[-1]
|
||||
if self.verify_max_temp(temp):
|
||||
script = {
|
||||
"script": f"PID_CALIBRATE HEATER={self.active_heater} TARGET={temp}"
|
||||
}
|
||||
script = {"script": f"PID_CALIBRATE HEATER={heater} TARGET={temp}"}
|
||||
self._screen._confirm_send_action(
|
||||
None,
|
||||
_("Initiate a PID calibration for:")
|
||||
+ f" {self.active_heater} @ {temp} ºC"
|
||||
+ f" {heater} @ {temp} ºC"
|
||||
+ "\n\n"
|
||||
+ _("It may take more than 5 minutes depending on the heater power."),
|
||||
"printer.gcode.script",
|
||||
|
Loading…
x
Reference in New Issue
Block a user