Arksine 9994e1eb84 machine: Add annotations
Signed-off-by:  Eric Callahan <arksine.code@gmail.com>
2021-05-18 19:16:49 -04:00

226 lines
8.8 KiB
Python

# Machine manipulation request handlers
#
# Copyright (C) 2020 Eric Callahan <arksine.code@gmail.com>
#
# This file may be distributed under the terms of the GNU GPLv3 license.
from __future__ import annotations
import os
import re
import pathlib
import logging
import platform
import distro
from tornado.ioloop import IOLoop
# Annotation imports
from typing import (
TYPE_CHECKING,
Any,
Dict,
)
if TYPE_CHECKING:
from confighelper import ConfigHelper
from websockets import WebRequest
from . import shell_command
SCMDComp = shell_command.ShellCommandFactory
ALLOWED_SERVICES = ["moonraker", "klipper", "webcamd"]
SD_CID_PATH = "/sys/block/mmcblk0/device/cid"
SD_CSD_PATH = "/sys/block/mmcblk0/device/csd"
SD_MFGRS = {
'1b': "Samsung",
'03': "Sandisk"
}
class Machine:
def __init__(self, config: ConfigHelper) -> None:
self.server = config.get_server()
dist_info: Dict[str, Any]
dist_info = {'name': distro.name(pretty=True)}
dist_info.update(distro.info())
self.system_info: Dict[str, Dict[str, Any]] = {
'cpu_info': self._get_cpu_info(),
'sd_info': self._get_sdcard_info(),
'distribution': dist_info
}
# Add system info to log rollover
sys_info_msg = "\nSystem Info:"
for header, info in self.system_info.items():
sys_info_msg += f"\n\n***{header}***"
for key, val in info.items():
sys_info_msg += f"\n {key}: {val}"
self.server.add_log_rollover_item('system_info', sys_info_msg)
self.server.register_endpoint(
"/machine/reboot", ['POST'], self._handle_machine_request)
self.server.register_endpoint(
"/machine/shutdown", ['POST'], self._handle_machine_request)
self.server.register_endpoint(
"/machine/services/restart", ['POST'],
self._handle_service_request)
self.server.register_endpoint(
"/machine/services/stop", ['POST'],
self._handle_service_request)
self.server.register_endpoint(
"/machine/services/start", ['POST'],
self._handle_service_request)
self.server.register_endpoint(
"/machine/system_info", ['GET'],
self._handle_sysinfo_request)
# Register remote methods
self.server.register_remote_method(
"shutdown_machine", self.shutdown_machine)
self.server.register_remote_method(
"reboot_machine", self.reboot_machine)
async def _handle_machine_request(self, web_request: WebRequest) -> str:
ep = web_request.get_endpoint()
if ep == "/machine/shutdown":
await self.shutdown_machine()
elif ep == "/machine/reboot":
await self.reboot_machine()
else:
raise self.server.error("Unsupported machine request")
return "ok"
async def shutdown_machine(self) -> None:
await self._execute_cmd("sudo shutdown now")
async def reboot_machine(self) -> None:
await self._execute_cmd("sudo shutdown -r now")
async def do_service_action(self,
action: str,
service_name: str
) -> None:
await self._execute_cmd(
f'sudo systemctl {action} {service_name}')
async def _handle_service_request(self, web_request: WebRequest) -> str:
name: str = web_request.get('service').lower()
action = web_request.get_endpoint().split('/')[-1]
if name == "moonraker":
if action != "restart":
raise self.server.error(
f"Service action '{action}' not available for moonraker")
IOLoop.current().spawn_callback(
self.do_service_action, action, name)
elif name in ALLOWED_SERVICES:
await self.do_service_action(action, name)
else:
raise self.server.error(
f"Invalid argument recevied for 'name': {name}")
return "ok"
async def _handle_sysinfo_request(self,
web_request: WebRequest
) -> Dict[str, Any]:
return {'system_info': self.system_info}
async def _execute_cmd(self, cmd: str) -> None:
shell_cmd: SCMDComp = self.server.lookup_component('shell_command')
scmd = shell_cmd.build_shell_command(cmd, None)
try:
await scmd.run(timeout=2., verbose=False)
except Exception:
logging.exception(f"Error running cmd '{cmd}'")
raise
def get_system_info(self) -> Dict[str, Dict[str, Any]]:
return self.system_info
def _get_sdcard_info(self) -> Dict[str, Any]:
sd_info: Dict[str, Any] = {}
cid_file = pathlib.Path(SD_CID_PATH)
if not cid_file.exists():
# No SDCard detected at mmcblk0
return {}
try:
cid_text = cid_file.read_text().strip().lower()
mid = cid_text[:2]
sd_info['manufacturer_id'] = mid
sd_info['manufacturer'] = SD_MFGRS.get(mid, "Unknown")
sd_info['oem_id'] = cid_text[2:6]
sd_info['product_name'] = bytes.fromhex(cid_text[6:16]).decode(
encoding="ascii", errors="ignore")
sd_info['product_revision'] = \
f"{int(cid_text[16], 16)}.{int(cid_text[17], 16)}"
sd_info['serial_number'] = cid_text[18:26]
mfg_year = int(cid_text[27:29], 16) + 2000
mfg_month = int(cid_text[29], 16)
sd_info['manufacturer_date'] = f"{mfg_month}/{mfg_year}"
except Exception:
logging.info("Error reading SDCard CID Register")
return {}
sd_info['capacity'] = "Unknown"
sd_info['total_bytes'] = 0
csd_file = pathlib.Path(SD_CSD_PATH)
# Read CSD Register
try:
csd_reg = bytes.fromhex(csd_file.read_text().strip())
csd_type = (csd_reg[0] >> 6) & 0x3
if csd_type == 0:
# Standard Capacity (CSD Version 1.0)
max_block_len: int = 2**(csd_reg[5] & 0xF)
c_size = ((csd_reg[6] & 0x3) << 10) | (csd_reg[7] << 2) | \
((csd_reg[8] >> 6) & 0x3)
c_mult_reg = ((csd_reg[9] & 0x3) << 1) | (csd_reg[10] >> 7)
c_mult = 2**(c_mult_reg + 2)
total_bytes: int = (c_size + 1) * c_mult * max_block_len
sd_info['capacity'] = f"{(total_bytes / (1024.0**2)):.1f} MiB"
elif csd_type == 1:
# High Capacity (CSD Version 2.0)
c_size = ((csd_reg[7] & 0x3F) << 16) | (csd_reg[8] << 8) | \
csd_reg[9]
total_bytes = (c_size + 1) * 512 * 1024
sd_info['capacity'] = f"{(total_bytes / (1024.0**3)):.1f} GiB"
elif csd_type == 2:
# Ultra Capacity (CSD Version 3.0)
c_size = ((csd_reg[6]) & 0xF) << 24 | (csd_reg[7] << 16) | \
(csd_reg[8] << 8) | csd_reg[9]
total_bytes = (c_size + 1) * 512 * 1024
sd_info['capacity'] = f"{(total_bytes / (1024.0**4)):.1f} TiB"
else:
# Invalid CSD, skip capacity check
return sd_info
sd_info['total_bytes'] = total_bytes
except Exception:
logging.info("Error Reading SDCard CSD Register")
return sd_info
def _get_cpu_info(self) -> Dict[str, Any]:
cpu_file = pathlib.Path("/proc/cpuinfo")
cpu_info = {
'cpu_count': os.cpu_count(),
'bits': platform.architecture()[0],
'processor': platform.processor() or platform.machine(),
'cpu_desc': "",
'hardware_desc': "",
'model': ""
}
if not cpu_file.exists():
return cpu_info
try:
cpu_text = cpu_file.read_text().strip()
cpu_items = [item.strip() for item in cpu_text.split("\n\n")
if item.strip()]
for item in cpu_items:
cpu_desc_match = re.search(r"model name\s+:\s+(.+)", item)
if cpu_desc_match is not None:
cpu_info['cpu_desc'] = cpu_desc_match.group(1).strip()
continue
hw_match = re.search(r"Hardware\s+:\s+(.+)", cpu_items[-1])
if hw_match is not None:
cpu_info['hardware_desc'] = hw_match.group(1).strip()
model_match = re.search(r"Model\s+:\s+(.+)", cpu_items[-1])
if model_match is not None:
cpu_info['model'] = model_match.group(1).strip()
except Exception:
logging.info("Error Reading /proc/cpuinfo")
return cpu_info
def load_component(config: ConfigHelper) -> Machine:
return Machine(config)