import os, signal import json import logging import re import socket import subprocess import threading import time from contextlib import suppress from threading import Thread from subprocess import PIPE, Popen, STDOUT from queue import Queue, Empty import gi gi.require_version("Gtk", "3.0") from gi.repository import Gtk, Gdk, GLib RESCAN_INTERVAL = 180 KS_SOCKET_FILE = "/tmp/.KS_wpa_supplicant" class WifiManager(): networks_in_supplicant = [] connected = False _stop_loop = False thread = None def __init__(self, interface, *args, **kwargs): super().__init__(*args, **kwargs) self.loop = None self._poll_task = None self._scanning = False self._callbacks = { "connected": [], "connecting_status": [], "scan_results": [] } self._stop_loop = False self.connected = False self.connected_ssid = None self.connecting_info = [] self.event = threading.Event() self.initialized = False self.interface = interface self.networks = {} self.supplicant_networks = {} self.queue = Queue() self.tasks = [] self.timeout = None self.scan_time = 0 if os.path.exists(KS_SOCKET_FILE): os.remove(KS_SOCKET_FILE) try: self.soc = socket.socket(socket.AF_UNIX, socket.SOCK_DGRAM) self.soc.bind(KS_SOCKET_FILE) self.soc.connect("/var/run/wpa_supplicant/%s" % interface) except: logging.info("Error connecting to wifi socket: %s" % interface) return self.wpa_thread = WpaSocket(self, self.queue, self.callback) self.wpa_thread.start() self.initialized = True self.wpa_cli("ATTACH", False) self.wpa_cli("SCAN", False) GLib.idle_add(self.read_wpa_supplicant) self.timeout = GLib.timeout_add_seconds(RESCAN_INTERVAL, self.rescan) def add_callback(self, name, callback): if name in self._callbacks and callback not in self._callbacks[name]: self._callbacks[name].append(callback) def add_network(self, ssid, psk): for id in list(self.supplicant_networks): if self.supplicant_networks[id]['ssid'] == ssid: #Modify network return # TODO: Add wpa_cli error checking network_id = self.wpa_cli("ADD_NETWORK") commands = [ 'ENABLE_NETWORK %s' % (network_id), 'SET_NETWORK %s ssid "%s"' % (network_id, ssid.replace('"','\"')), 'SET_NETWORK %s psk "%s"' % (network_id, psk.replace('"','\"')) ] self.wpa_cli_batch(commands) self.read_wpa_supplicant() id = None for i in list(self.supplicant_networks): if self.supplicant_networks[i]['ssid'] == ssid: id = i break if id == None: logging.info("Error adding network") return False self.save_wpa_conf() return True def callback(self, type, msg): if type in self._callbacks: for cb in self._callbacks[type]: Gdk.threads_add_idle( GLib.PRIORITY_DEFAULT_IDLE, cb, msg) def connect(self, ssid): id = None for netid, net in self.supplicant_networks.items(): if net['ssid'] == ssid: id = netid break if id == None: logging.info("Wifi network is not defined in wpa_supplicant") return False logging.info("Attempting to connect to wifi: %s" % id) self.connecting_info = ["Attempting to connect to %s" % ssid] self.wpa_cli("SELECT_NETWORK %s" % id) self.save_wpa_conf() def delete_network(self, ssid): id = None for i in list(self.supplicant_networks): if self.supplicant_networks[i]['ssid'] == ssid: id = i break if id == None: logging.debug("Unable to find network in wpa_supplicant") return self.wpa_cli("REMOVE_NETWORK %s" % id) for id in list(self.supplicant_networks): if self.supplicant_networks[id]['ssid'] == ssid: del self.supplicant_networks[id] break self.save_wpa_conf() def get_connected_ssid(self): return self.connected_ssid def get_current_wifi(self, interface="wlan0"): status = self.wpa_cli("STATUS").split('\n') vars = {} for line in status: arr = line.split('=') vars[arr[0]] = "=".join(arr[1:]) prev_ssid = self.connected_ssid if "ssid" in vars and "bssid" in vars: self.connected = True self.connected_ssid = vars['ssid'] for ssid, val in self.networks.items(): if ssid == vars['ssid']: self.networks[ssid]['connected'] = True else: self.networks[ssid]['connected'] = False if prev_ssid != self.connected_ssid: for cb in self._callbacks['connected']: Gdk.threads_add_idle( GLib.PRIORITY_DEFAULT_IDLE, cb, self.connected_ssid, prev_ssid) return [vars['ssid'], vars['bssid']] else: logging.info("Resetting connected_ssid") self.connected = False self.connected_ssid = None for ssid, val in self.networks.items(): self.networks[ssid]['connected'] = False if prev_ssid != self.connected_ssid: for cb in self._callbacks['connected']: Gdk.threads_add_idle( GLib.PRIORITY_DEFAULT_IDLE, cb, self.connected_ssid, prev_ssid) return None def get_current_wifi_idle_add(self): self.get_current_wifi() return False def get_network_info(self, ssid=None, mac=None): if ssid is not None and ssid in self.networks: return self.networks[ssid] if mac is not None and ssid is None: for net in self.networks: if mac == net['mac']: return net return None def get_networks(self): return list(self.networks) def get_supplicant_networks(self): return self.supplicant_networks def is_connected(self): return self.connected def is_initialized(self): return self.initialized def read_wpa_supplicant(self): results = self.wpa_cli("LIST_NETWORKS").split('\n') results.pop(0) self.supplicant_networks = {} self.networks_in_supplicant = [] for net in [n.split('\t') for n in results]: self.supplicant_networks[net[0]] = { "ssid": net[1], "bssid": net[2], "flags": net[3] if len(net) == 4 else "" } self.networks_in_supplicant.append(self.supplicant_networks[net[0]]) def remove_callback(self, name, callback): if name in self._callbacks and callback in self._callbacks[name]: self._callbacks[name].remove(callback) def rescan(self): self.wpa_cli("SCAN", False) return True def save_wpa_conf(self): logging.info("Saving WPA config") self.wpa_cli("SAVE_CONFIG") def scan_results(self, interface='wlan0'): new_networks = [] deleted_networks = list(self.networks) results = self.wpa_cli("SCAN_RESULTS").split('\n') results.pop(0) aps = [] for res in results: match = re.match("^([a-f0-9:]+)\s+([0-9]+)\s+([\-0-9]+)\s+(\S+)\s+(.+)?", res) if match: net = { "mac": match.group(1), "channel": WifiChannels.lookup(match.group(2))[1], "connected": False, "configured": False, "frequency": match.group(2), "flags": match.group(4), "signal_level_dBm": match.group(3), "ssid": match.group(5) } if "WPA2" in net['flags']: net['encryption'] = "WPA2" elif "WPA" in net['flags']: net['encryption'] = "WPA" elif "WEP" in net['flags']: net['encryption'] = "WEP" else: net['encryption'] = "off" aps.append(net) cur_info = self.get_current_wifi() self.networks = {} for ap in aps: self.networks[ap['ssid']] = ap if cur_info is not None and cur_info[0] == ap['ssid'] and cur_info[1].lower() == ap['mac'].lower(): self.networks[ap['ssid']]['connected'] = True for net in list(self.networks): if net in deleted_networks: deleted_networks.remove(net) else: new_networks.append(net) if len(new_networks) > 0 or len(deleted_networks) > 0: for cb in self._callbacks['scan_results']: Gdk.threads_add_idle( GLib.PRIORITY_DEFAULT_IDLE, cb, new_networks, deleted_networks) def wpa_cli(self, command, wait=True): if wait == False: self.wpa_thread.skip_command() self.soc.send(command.encode()) if wait == True: resp = self.queue.get() return resp def wpa_cli_batch(self, commands): for cmd in commands: self.wpa_cli(cmd) class WpaSocket(Thread): def __init__ (self, wm, queue, callback): super().__init__() self.queue = queue self.callback = callback self.soc = wm.soc self._stop_loop = False self.skip_commands = 0 self.wm = wm def run(self): event = threading.Event() logging.debug("Setting up wifi event loop") while self._stop_loop == False: try: msg = self.soc.recv(4096).decode().strip() except: # TODO: Socket error continue if msg.startswith("<"): if "CTRL-EVENT-SCAN-RESULTS" in msg: Gdk.threads_add_idle(GLib.PRIORITY_DEFAULT_IDLE, self.wm.scan_results) elif "CTRL-EVENT-DISCONNECTED" in msg: self.callback("connecting_status", msg) match = re.match('<3>CTRL-EVENT-DISCONNECTED bssid=(\S+) reason=3 locally_generated=1', msg) if match: for net in self.wm.networks: if self.wm.networks[net]['mac'] == match.group(1): self.wm.networks[net]['connected'] = False break elif "Trying to associate" in msg: self.callback("connecting_status", msg) elif "CTRL-EVENT-REGDOM-CHANGE" in msg: self.callback("connecting_status", msg) elif "CTRL-EVENT-CONNECTED" in msg: Gdk.threads_add_idle(GLib.PRIORITY_DEFAULT_IDLE, self.wm.get_current_wifi_idle_add) self.callback("connecting_status", msg) else: if self.skip_commands > 0: self.skip_commands = self.skip_commands - 1 else: self.queue.put(msg) logging.info("Wifi event loop ended") def skip_command(self): self.skip_commands = self.skip_commands + 1 def stop(self): self._stop_loop = True class WifiChannels: @staticmethod def lookup(freq): if freq == "2412": return ("2.4","1") if freq == "2417": return ("2.4","2") if freq == "2422": return ("2.4","3") if freq == "2427": return ("2.4","4") if freq == "2432": return ("2.4","5") if freq == "2437": return ("2.4","6") if freq == "2442": return ("2.4","7") if freq == "2447": return ("2.4","8") if freq == "2452": return ("2.4","9") if freq == "2457": return ("2.4","10") if freq == "2462": return ("2.4","11") if freq == "2467": return ("2.4","12") if freq == "2472": return ("2.4","13") if freq == "2484": return ("2.4","14") if freq == "5035": return ("5","7") if freq == "5040": return ("5","8") if freq == "5045": return ("5","9") if freq == "5055": return ("5","11") if freq == "5060": return ("5","12") if freq == "5080": return ("5","16") if freq == "5170": return ("5","34") if freq == "5180": return ("5","36") if freq == "5190": return ("5","38") if freq == "5200": return ("5","40") if freq == "5210": return ("5","42") if freq == "5220": return ("5","44") if freq == "5230": return ("5","46") if freq == "5240": return ("5","48") if freq == "5260": return ("5","52") if freq == "5280": return ("5","56") if freq == "5300": return ("5","60") if freq == "5320": return ("5","64") if freq == "5500": return ("5","100") if freq == "5520": return ("5","104") if freq == "5540": return ("5","108") if freq == "5560": return ("5","112") if freq == "5580": return ("5","116") if freq == "5600": return ("5","120") if freq == "5620": return ("5","124") if freq == "5640": return ("5","128") if freq == "5660": return ("5","132") if freq == "5680": return ("5","136") if freq == "5700": return ("5","140") if freq == "5720": return ("5","144") if freq == "5745": return ("5","149") if freq == "5765": return ("5","153") if freq == "5785": return ("5","157") if freq == "5805": return ("5","161") if freq == "5825": return ("5","165") if freq == "4915": return ("5","183") if freq == "4920": return ("5","184") if freq == "4925": return ("5","185") if freq == "4935": return ("5","187") if freq == "4940": return ("5","188") if freq == "4945": return ("5","189") if freq == "4960": return ("5","192") if freq == "4980": return ("5","196") return None;