From 0d6791a3204809fc3e651c699d340f4705dbbeb9 Mon Sep 17 00:00:00 2001 From: Eric Callahan Date: Thu, 13 Jan 2022 18:26:15 -0500 Subject: [PATCH] machine: refactor systemd cli implementation Move all systemd cli calls to its own provider class, inherted from a base provider class. This is in preparation for multiple provider implementations. Signed-off-by: Eric Callahan --- moonraker/components/machine.py | 291 ++++++++++++++++++-------------- 1 file changed, 163 insertions(+), 128 deletions(-) diff --git a/moonraker/components/machine.py b/moonraker/components/machine.py index aa2efc1..7d43ded 100644 --- a/moonraker/components/machine.py +++ b/moonraker/components/machine.py @@ -19,13 +19,15 @@ from typing import ( TYPE_CHECKING, Any, Dict, - List + List, + Optional ) + if TYPE_CHECKING: from confighelper import ConfigHelper from websockets import WebRequest - from . import shell_command - SCMDComp = shell_command.ShellCommandFactory + from .shell_command import ShellCommandFactory as SCMDComp + from .proc_stats import ProcStats ALLOWED_SERVICES = [ "moonraker", "klipper", "webcamd", "MoonCord", @@ -42,6 +44,8 @@ SD_MFGRS = { '74': "PNY" } IP_FAMILIES = {'inet': 'ipv4', 'inet6': 'ipv6'} +NETWORK_UPDATE_SEQUENCE = 10 + class Machine: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() @@ -50,7 +54,6 @@ class Machine: dist_info.update(distro.info()) dist_info['release_info'] = distro.distro_release_info() self.inside_container = False - self.virt_id = "none" self.system_info: Dict[str, Any] = { 'cpu_info': self._get_cpu_info(), 'sd_info': self._get_sdcard_info(), @@ -58,7 +61,7 @@ class Machine: 'virtualization': self._check_inside_container() } self._update_log_rollover(log=True) - self.available_services: Dict[str, Dict[str, str]] = {} + self.sys_provider = SystemdCliProvider(config) self.server.register_endpoint( "/machine/reboot", ['POST'], self._handle_machine_request) @@ -81,9 +84,9 @@ class Machine: # Register remote methods self.server.register_remote_method( - "shutdown_machine", self.shutdown_machine) + "shutdown_machine", self.sys_provider.shutdown) self.server.register_remote_method( - "reboot_machine", self.reboot_machine) + "reboot_machine", self.sys_provider.reboot) self.init_evt = asyncio.Event() @@ -105,38 +108,39 @@ class Machine: pass async def component_init(self): + await self.sys_provider.initialize() if not self.inside_container: - await self._check_virt_status() - await self._find_active_services() - await self.parse_network_interfaces(notify=False) - self._update_log_rollover() + virt_info = await self.sys_provider.check_virt_status() + self.system_info['virtualization'] = virt_info + await self._parse_network_interfaces(0, notify=False) + pstats: ProcStats = self.server.lookup_component('proc_stats') + pstats.register_stat_callback(self._parse_network_interfaces) + available_svcs = self.sys_provider.get_available_services() + avail_list = list(available_svcs.keys()) + self.system_info['available_services'] = avail_list + self.system_info['service_state'] = available_svcs + self.init_evt.set() async def _handle_machine_request(self, web_request: WebRequest) -> str: ep = web_request.get_endpoint() if self.inside_container: + virt_id = self.system_info['virtualization'].get('virt_id', "none") raise self.server.error( f"Cannot {ep.split('/')[-1]} from within a " - f"{self.virt_id} container") + f"{virt_id} container") if ep == "/machine/shutdown": - await self.shutdown_machine() + await self.sys_provider.shutdown() elif ep == "/machine/reboot": - await self.reboot_machine() + await self.sys_provider.reboot() else: raise self.server.error("Unsupported machine request") return "ok" - async def shutdown_machine(self) -> None: - await self._execute_cmd("sudo shutdown now") - - async def reboot_machine(self) -> None: - await self._execute_cmd("sudo shutdown -r now") - async def do_service_action(self, action: str, service_name: str ) -> None: - await self._execute_cmd( - f'sudo systemctl {action} {service_name}') + await self.sys_provider.do_service_action(action, service_name) async def _handle_service_request(self, web_request: WebRequest) -> str: name: str = web_request.get('service') @@ -147,11 +151,10 @@ class Machine: f"Service action '{action}' not available for moonraker") event_loop = self.server.get_event_loop() event_loop.register_callback(self.do_service_action, action, name) - elif name in self.available_services: + elif self.sys_provider.is_service_available(name): await self.do_service_action(action, name) else: - if name in ALLOWED_SERVICES and \ - name not in self.available_services: + if name in ALLOWED_SERVICES: raise self.server.error(f"Service '{name}' not installed") raise self.server.error( f"Service '{name}' not allowed") @@ -162,15 +165,6 @@ class Machine: ) -> Dict[str, Any]: return {'system_info': self.system_info} - async def _execute_cmd(self, cmd: str) -> None: - shell_cmd: SCMDComp = self.server.lookup_component('shell_command') - scmd = shell_cmd.build_shell_command(cmd, None) - try: - await scmd.run(timeout=2., verbose=False) - except Exception: - logging.exception(f"Error running cmd '{cmd}'") - raise - def get_system_info(self) -> Dict[str, Any]: return self.system_info @@ -282,31 +276,6 @@ class Machine: logging.info("Error Reading /proc/meminfo") return cpu_info - async def _find_active_services(self): - shell_cmd: SCMDComp = self.server.lookup_component('shell_command') - scmd = shell_cmd.build_shell_command( - "systemctl list-units --all --type=service --plain --no-legend") - try: - resp = await scmd.run_with_response() - lines = resp.split('\n') - services = [line.split()[0].strip() for line in lines - if ".service" in line.strip()] - except Exception: - services = [] - for svc in services: - sname = svc.rsplit('.', 1)[0] - for allowed in ALLOWED_SERVICES: - if sname.startswith(allowed): - self.available_services[sname] = { - 'active_state': "unknown", - 'sub_state': "unknown" - } - avail_list = list(self.available_services.keys()) - self.system_info['available_services'] = avail_list - self.system_info['service_state'] = self.available_services - await self.update_service_status(notify=False) - self.init_evt.set() - def _check_inside_container(self) -> Dict[str, Any]: cgroup_file = pathlib.Path(CGROUP_PATH) virt_type = virt_id = "none" @@ -341,81 +310,17 @@ class Machine: virt_id = "docker" except Exception: logging.exception(f"Error reading {SCHED_PATH}") - - self.virt_id = virt_id return { 'virt_type': virt_type, 'virt_identifier': virt_id } - async def _check_virt_status(self) -> None: - # Fallback virtualization check - virt_id = virt_type = "none" - - shell_cmd: SCMDComp = self.server.lookup_component('shell_command') - - # Check for any form of virtualization. This will report the innermost - # virtualization type in the event that nested virtualization is used - scmd = shell_cmd.build_shell_command("systemd-detect-virt") - try: - resp = await scmd.run_with_response() - except shell_cmd.error: - pass - else: - virt_id = resp.strip() - - if virt_id != "none": - # Check explicitly for container virtualization - scmd = shell_cmd.build_shell_command( - "systemd-detect-virt --container") - try: - resp = await scmd.run_with_response() - except shell_cmd.error: - virt_type = "vm" - else: - if virt_id == resp.strip(): - virt_type = "container" - else: - # Moonraker is run from within a VM inside a container - virt_type = "vm" - logging.info( - f"Virtualized Environment Detected, Type: {virt_type} " - f"id: {virt_id}") - else: - logging.info("No Virtualization Detected") - - self.virt_id = virt_id - self.system_info['virtualization'] = { - 'virt_type': virt_type, - 'virt_identifier': virt_id - } - - async def update_service_status(self, notify: bool = True) -> None: - if not self.available_services: + async def _parse_network_interfaces(self, + sequence: int, + notify: bool = True + ) -> None: + if sequence % NETWORK_UPDATE_SEQUENCE: return - svcs = self.system_info['available_services'] - shell_cmd: SCMDComp = self.server.lookup_component('shell_command') - scmd = shell_cmd.build_shell_command( - "systemctl show -p ActiveState,SubState --value " - f"{' '.join(svcs)}") - try: - resp = await scmd.run_with_response(log_complete=False) - for svc, state in zip(svcs, resp.strip().split('\n\n')): - active_state, sub_state = state.split('\n', 1) - new_state: Dict[str, str] = { - 'active_state': active_state, - 'sub_state': sub_state - } - if self.available_services[svc] != new_state: - self.available_services[svc] = new_state - if notify: - self.server.send_event( - "machine:service_state_changed", - {svc: new_state}) - except Exception: - logging.exception("Error processing service state update") - - async def parse_network_interfaces(self, notify: bool = True) -> None: shell_cmd: SCMDComp = self.server.lookup_component('shell_command') scmd = shell_cmd.build_shell_command("ip -json address") network: Dict[str, Any] = {} @@ -452,5 +357,135 @@ class Machine: self.server.send_event("machine:net_state_changed", network) self.system_info['network'] = network +class BaseProvider: + def __init__(self, config: ConfigHelper) -> None: + self.server = config.get_server() + self.available_services: Dict[str, Dict[str, str]] = {} + self.shell_cmd: SCMDComp = self.server.load_component( + config, 'shell_command') + + async def initialize(self) -> None: + pass + + async def shutdown(self) -> None: + await self.shell_cmd.exec_cmd(f"sudo shutdown now") + + async def reboot(self) -> None: + await self.shell_cmd.exec_cmd(f"sudo shutdown -r now") + + async def do_service_action(self, + action: str, + service_name: str + ) -> None: + pass + + async def check_virt_status(self) -> Dict[str, Any]: + pass + + def is_service_available(self, service: str) -> bool: + return service in self.available_services + + def get_available_services(self) -> Dict[str, Dict[str, str]]: + return self.available_services + +class SystemdCliProvider(BaseProvider): + async def initialize(self) -> None: + await self._detect_active_services() + if self.available_services: + svcs = list(self.available_services.keys()) + self.svc_cmd = self.shell_cmd.build_shell_command( + "systemctl show -p ActiveState,SubState --value " + f"{' '.join(svcs)}") + await self._update_service_status(0, notify=True) + pstats: ProcStats = self.server.lookup_component('proc_stats') + pstats.register_stat_callback(self._update_service_status) + + async def do_service_action(self, + action: str, + service_name: str + ) -> None: + await self.shell_cmd.exec_cmd( + f'sudo systemctl {action} {service_name}') + + async def check_virt_status(self) -> Dict[str, Any]: + # Fallback virtualization check + virt_id = virt_type = "none" + + # Check for any form of virtualization. This will report the innermost + # virtualization type in the event that nested virtualization is used + try: + resp: str = await self.shell_cmd.exec_cmd("systemd-detect-virt") + except self.shell_cmd.error: + pass + else: + virt_id = resp.strip() + + if virt_id != "none": + # Check explicitly for container virtualization + try: + resp = await self.shell_cmd.exec_cmd( + "systemd-detect-virt --container") + except self.shell_cmd.error: + virt_type = "vm" + else: + if virt_id == resp.strip(): + virt_type = "container" + else: + # Moonraker is run from within a VM inside a container + virt_type = "vm" + logging.info( + f"Virtualized Environment Detected, Type: {virt_type} " + f"id: {virt_id}") + else: + logging.info("No Virtualization Detected") + return { + 'virt_type': virt_type, + 'virt_identifier': virt_id + } + + async def _detect_active_services(self): + try: + resp: str = await self.shell_cmd.exec_cmd( + "systemctl list-units --all --type=service --plain" + " --no-legend") + lines = resp.split('\n') + services = [line.split()[0].strip() for line in lines + if ".service" in line.strip()] + except Exception: + services = [] + for svc in services: + sname = svc.rsplit('.', 1)[0] + for allowed in ALLOWED_SERVICES: + if sname.startswith(allowed): + self.available_services[sname] = { + 'active_state': "unknown", + 'sub_state': "unknown" + } + + async def _update_service_status(self, + sequence: int, + notify: bool = True + ) -> None: + if sequence % 2: + # Update every other sequence + return + svcs = list(self.available_services.keys()) + try: + resp = await self.svc_cmd.run_with_response(log_complete=False) + for svc, state in zip(svcs, resp.strip().split('\n\n')): + active_state, sub_state = state.split('\n', 1) + new_state: Dict[str, str] = { + 'active_state': active_state, + 'sub_state': sub_state + } + if self.available_services[svc] != new_state: + self.available_services[svc] = new_state + if notify: + self.server.send_event( + "machine:service_state_changed", + {svc: new_state}) + except Exception: + logging.exception("Error processing service state update") + def load_component(config: ConfigHelper) -> Machine: return Machine(config)