Use f-strings Avoid unnecessary casts to str()bool()int() Ensure file closure Merge nested ifs Simplify for-assigns-appends with comprehensions and internal functions Avoid shadowing internal function names Initialize variables Return value directly instead of assign then return Make some methods static
187 lines
6.0 KiB
Python
187 lines
6.0 KiB
Python
import logging
|
|
import logging.handlers
|
|
import os
|
|
import re
|
|
import subprocess
|
|
import sys
|
|
import threading
|
|
import time
|
|
import traceback
|
|
from queue import SimpleQueue as Queue
|
|
|
|
import ctypes
|
|
import struct
|
|
|
|
dpms_loaded = False
|
|
try:
|
|
ctypes.cdll.LoadLibrary('libXext.so.6')
|
|
libXext = ctypes.CDLL('libXext.so.6')
|
|
|
|
|
|
class DPMS_State:
|
|
Fail = -1
|
|
On = 0
|
|
Standby = 1
|
|
Suspend = 2
|
|
Off = 3
|
|
|
|
|
|
def get_DPMS_state(display_name_in_byte_string=b':0'):
|
|
state = DPMS_State.Fail
|
|
if not isinstance(display_name_in_byte_string, bytes):
|
|
raise TypeError
|
|
display_name = ctypes.c_char_p()
|
|
display_name.value = display_name_in_byte_string
|
|
libXext.XOpenDisplay.restype = ctypes.c_void_p
|
|
display = ctypes.c_void_p(libXext.XOpenDisplay(display_name))
|
|
dummy1_i_p = ctypes.create_string_buffer(8)
|
|
dummy2_i_p = ctypes.create_string_buffer(8)
|
|
if display.value:
|
|
if libXext.DPMSQueryExtension(display, dummy1_i_p, dummy2_i_p) \
|
|
and libXext.DPMSCapable(display):
|
|
onoff_p = ctypes.create_string_buffer(1)
|
|
state_p = ctypes.create_string_buffer(2)
|
|
if libXext.DPMSInfo(display, state_p, onoff_p):
|
|
onoff = struct.unpack('B', onoff_p.raw)[0]
|
|
if onoff:
|
|
state = struct.unpack('H', state_p.raw)[0]
|
|
libXext.XCloseDisplay(display)
|
|
return state
|
|
|
|
|
|
dpms_loaded = True
|
|
except Exception:
|
|
pass
|
|
|
|
|
|
def get_network_interfaces():
|
|
stream = os.popen("ip addr | grep ^'[0-9]' | cut -d ' ' -f 2 | grep -o '[a-zA-Z0-9\\.]*'")
|
|
return [i for i in stream.read().strip().split('\n') if not i.startswith('lo')]
|
|
|
|
|
|
def get_wireless_interfaces():
|
|
p = subprocess.Popen(["which", "iwconfig"], stdout=subprocess.PIPE)
|
|
|
|
while p.poll() is None:
|
|
time.sleep(.1)
|
|
if p.poll() != 0:
|
|
return None
|
|
|
|
try:
|
|
p = subprocess.Popen(["iwconfig"], stdout=subprocess.PIPE, stderr=subprocess.PIPE)
|
|
result = p.stdout.read().decode('ascii').split('\n')
|
|
except Exception as e:
|
|
logging.critical(e, exc_info=True)
|
|
logging.info("Error with running iwconfig command")
|
|
return None
|
|
interfaces = []
|
|
for line in result:
|
|
match = re.search('^(\\S+)\\s+.*$', line)
|
|
if match:
|
|
interfaces.append(match[1])
|
|
|
|
return interfaces
|
|
|
|
|
|
def get_software_version():
|
|
prog = ('git', '-C', os.path.dirname(__file__), 'describe', '--always',
|
|
'--tags', '--long', '--dirty')
|
|
try:
|
|
process = subprocess.Popen(prog, stdout=subprocess.PIPE,
|
|
stderr=subprocess.PIPE)
|
|
ver, err = process.communicate()
|
|
retcode = process.wait()
|
|
if retcode == 0:
|
|
version = ver.strip()
|
|
if isinstance(version, bytes):
|
|
version = version.decode()
|
|
return version
|
|
else:
|
|
logging.debug(f"Error getting git version: {err}")
|
|
except OSError:
|
|
logging.exception("Error runing git describe")
|
|
return "?"
|
|
|
|
|
|
def patch_threading_excepthook():
|
|
"""Installs our exception handler into the threading modules Thread object
|
|
Inspired by https://bugs.python.org/issue1230540
|
|
"""
|
|
old_init = threading.Thread.__init__
|
|
|
|
def new_init(self, *args, **kwargs):
|
|
old_init(self, *args, **kwargs)
|
|
old_run = self.run
|
|
|
|
def run_with_excepthook(*args, **kwargs):
|
|
try:
|
|
old_run(*args, **kwargs)
|
|
except (KeyboardInterrupt, SystemExit):
|
|
raise
|
|
except Exception:
|
|
sys.excepthook(*sys.exc_info(), thread_identifier=threading.get_ident())
|
|
|
|
self.run = run_with_excepthook
|
|
|
|
threading.Thread.__init__ = new_init
|
|
|
|
|
|
# Timed rotating file handler based on Klipper and Moonraker's implementation
|
|
class KlipperScreenLoggingHandler(logging.handlers.TimedRotatingFileHandler):
|
|
def __init__(self, software_version, filename, **kwargs):
|
|
super(KlipperScreenLoggingHandler, self).__init__(filename, **kwargs)
|
|
self.rollover_info = {
|
|
'header': f"{'-' * 20}KlipperScreen Log Start{'-' * 20}",
|
|
'version': f"Git Version: {software_version}",
|
|
}
|
|
lines = [line for line in self.rollover_info.values() if line]
|
|
if self.stream is not None:
|
|
self.stream.write("\n".join(lines) + "\n")
|
|
|
|
def set_rollover_info(self, name, item):
|
|
self.rollover_info[name] = item
|
|
|
|
def doRollover(self):
|
|
super(KlipperScreenLoggingHandler, self).doRollover()
|
|
lines = [line for line in self.rollover_info.values() if line]
|
|
if self.stream is not None:
|
|
self.stream.write("\n".join(lines) + "\n")
|
|
|
|
|
|
# Logging based on Arksine's logging setup
|
|
def setup_logging(log_file, software_version):
|
|
root_logger = logging.getLogger()
|
|
queue = Queue()
|
|
queue_handler = logging.handlers.QueueHandler(queue)
|
|
root_logger.addHandler(queue_handler)
|
|
root_logger.setLevel(logging.DEBUG)
|
|
|
|
stdout_hdlr = logging.StreamHandler(sys.stdout)
|
|
stdout_fmt = logging.Formatter(
|
|
'%(asctime)s [%(filename)s:%(funcName)s()] - %(message)s')
|
|
stdout_hdlr.setFormatter(stdout_fmt)
|
|
|
|
fh = None
|
|
if log_file:
|
|
fh = KlipperScreenLoggingHandler(software_version, log_file, when='midnight', backupCount=2)
|
|
formatter = logging.Formatter(
|
|
'%(asctime)s [%(filename)s:%(funcName)s()] - %(message)s')
|
|
fh.setFormatter(formatter)
|
|
listener = logging.handlers.QueueListener(
|
|
queue, fh, stdout_hdlr)
|
|
else:
|
|
listener = logging.handlers.QueueListener(
|
|
queue, stdout_hdlr)
|
|
listener.start()
|
|
|
|
def logging_exception_handler(ex_type, value, tb, thread_identifier=None):
|
|
logging.exception(
|
|
f'Uncaught exception {ex_type}: {value}\n'
|
|
f'Traceback: {traceback.format_tb(tb)}'
|
|
)
|
|
|
|
sys.excepthook = logging_exception_handler
|
|
logging.captureWarnings(True)
|
|
|
|
return listener, fh
|