diff --git a/moonraker/components/update_manager/app_deploy.py b/moonraker/components/update_manager/app_deploy.py new file mode 100644 index 0000000..dcfd5d3 --- /dev/null +++ b/moonraker/components/update_manager/app_deploy.py @@ -0,0 +1,250 @@ +# Deploy updates for applications managed by Moonraker +# +# Copyright (C) 2021 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import pathlib +import shutil +import hashlib +from concurrent.futures.thread import ThreadPoolExecutor +from tornado.ioloop import IOLoop +from .base_deploy import BaseDeploy + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Optional, + Union, + Dict, + List, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from .update_manager import CommandHelper + +CHANNEL_TO_TYPE = { + "stable": "zip", + "beta": "zip_beta", + "dev": "git_repo" +} +TYPE_TO_CHANNEL = { + "zip": "stable", + "zip_beta": "beta", + "dev": "git_repo" +} + +class AppDeploy(BaseDeploy): + def __init__(self, + config: ConfigHelper, + cmd_helper: CommandHelper, + app_params: Optional[Dict[str, Any]] + ) -> None: + super().__init__(config, cmd_helper) + self.config = config + self.app_params = app_params + self.debug = self.cmd_helper.is_debug_enabled() + if app_params is not None: + self.channel: str = app_params['channel'] + self.path: pathlib.Path = pathlib.Path( + app_params['path']).expanduser().resolve() + executable: Optional[str] = app_params['executable'] + self.type = CHANNEL_TO_TYPE[self.channel] + else: + self.type = config.get('type') + self.channel = TYPE_TO_CHANNEL[self.type] + self.path = pathlib.Path( + config.get('path')).expanduser().resolve() + executable = config.get('env', None) + if self.channel not in CHANNEL_TO_TYPE.keys(): + raise config.error( + f"Invalid Channel '{self.channel}' for config " + f"section [{config.get_name()}]") + self._verify_path(config, 'path', self.path) + self.executable: Optional[pathlib.Path] = None + self.venv_args: Optional[str] = None + if executable is not None: + self.executable = pathlib.Path(executable).expanduser().resolve() + self._verify_path(config, 'env', self.executable) + self.venv_args = config.get('venv_args', None) + + self.need_channel_update = False + self._is_valid = False + + # We need to fetch all potential options for an Application. Not + # all options apply to each subtype, however we can't limit the + # options in children if we want to switch between channels and + # satisfy the confighelper's requirements. + self.origin: str = config.get('origin') + self.primary_branch = config.get("primary_branch", "master") + self.npm_pkg_json: Optional[pathlib.Path] = None + if config.get("enable_node_updates", False): + self.npm_pkg_json = self.path.joinpath("package-lock.json") + self._verify_path(config, 'enable_node_updates', self.npm_pkg_json) + self.python_reqs: Optional[pathlib.Path] = None + if self.executable is not None: + self.python_reqs = self.path.joinpath(config.get("requirements")) + self._verify_path(config, 'requirements', self.python_reqs) + self.install_script: Optional[pathlib.Path] = None + install_script = config.get('install_script', None) + if install_script is not None: + self.install_script = self.path.joinpath(install_script).resolve() + self._verify_path(config, 'install_script', self.install_script) + + @staticmethod + def _is_git_repo(app_path: Union[str, pathlib.Path]) -> bool: + if isinstance(app_path, str): + app_path = pathlib.Path(app_path).expanduser() + return app_path.joinpath('.git').exists() + + def _verify_path(self, + config: ConfigHelper, + option: str, + file_path: pathlib.Path + ) -> None: + if not file_path.exists(): + raise config.error( + f"Invalid path for option `{option}` in section " + f"[{config.get_name()}]: Path `{file_path}` does not exist") + + def check_need_channel_swap(self) -> bool: + return self.need_channel_update + + def check_same_paths(self, + app_path: Union[str, pathlib.Path], + executable: Union[str, pathlib.Path] + ) -> bool: + if isinstance(app_path, str): + app_path = pathlib.Path(app_path) + if isinstance(executable, str): + executable = pathlib.Path(executable) + app_path = app_path.expanduser() + executable = executable.expanduser() + if self.executable is None: + return False + return self.path.samefile(app_path) and \ + self.executable.samefile(executable) + + async def recover(self, + hard: bool = False, + force_dep_update: bool = False + ) -> None: + raise NotImplementedError + + async def reinstall(self): + raise NotImplementedError + + async def restart_service(self): + if self.name == "moonraker": + # Launch restart async so the request can return + # before the server restarts + IOLoop.current().call_later( + .1, self._do_restart) # type: ignore + else: + await self._do_restart() + + async def _do_restart(self) -> None: + self.notify_status("Restarting Service...") + try: + await self.cmd_helper.run_cmd( + f"sudo systemctl restart {self.name}") + except Exception: + if self.name == "moonraker": + # We will always get an error when restarting moonraker + # from within the child process, so ignore it + return + raise self.log_exc("Error restarting service") + + def get_update_status(self) -> Dict[str, Any]: + return { + 'channel': self.channel, + 'debug_enabled': self.debug, + 'need_channel_update': self.need_channel_update, + 'is_valid': self._is_valid + } + + async def _get_file_hash(self, + filename: Optional[pathlib.Path] + ) -> Optional[str]: + if filename is None or not filename.is_file(): + return None + + def hash_func(f: pathlib.Path) -> str: + return hashlib.sha256(f.read_bytes()).hexdigest() + try: + with ThreadPoolExecutor(max_workers=1) as tpe: + return await IOLoop.current().run_in_executor( + tpe, hash_func, filename) + except Exception: + return None + + async def _check_need_update(self, + prev_hash: Optional[str], + filename: Optional[pathlib.Path] + ) -> bool: + cur_hash = await self._get_file_hash(filename) + if prev_hash is None or cur_hash is None: + return False + return prev_hash != cur_hash + + async def _install_packages(self, package_list: List[str]) -> None: + pkgs = " ".join(package_list) + apt_cmd = self.cmd_helper.get_system_update_command() + self.notify_status("Installing system dependencies...") + # Install packages with apt-get + try: + await self.cmd_helper.run_cmd( + f"{apt_cmd} update", timeout=300., notify=True) + await self.cmd_helper.run_cmd( + f"{apt_cmd} install --yes {pkgs}", timeout=3600., + notify=True) + except Exception: + self.log_exc("Error updating packages via apt-get") + return + + async def _update_virtualenv(self, + requirements: Union[pathlib.Path, List[str]] + ) -> None: + if self.executable is None: + return + # Update python dependencies + bin_dir = self.executable.parent + if isinstance(requirements, pathlib.Path): + if not requirements.is_file(): + self.log_info( + f"Invalid path to requirements_file '{requirements}'") + return + args = f"-r {requirements}" + else: + args = " ".join(requirements) + pip = bin_dir.joinpath("pip") + self.notify_status("Updating python packages...") + try: + # First attempt to update pip + await self.cmd_helper.run_cmd( + f"{pip} install -U pip", timeout=1200., notify=True, + retries=3) + await self.cmd_helper.run_cmd( + f"{pip} install {args}", timeout=1200., notify=True, + retries=3) + except Exception: + self.log_exc("Error updating python requirements") + + async def _build_virtualenv(self) -> None: + if self.executable is None or self.venv_args is None: + return + bin_dir = self.executable.parent + env_path = bin_dir.joinpath("..").resolve() + self.notify_status(f"Creating virtualenv at: {env_path}...") + if env_path.exists(): + shutil.rmtree(env_path) + try: + await self.cmd_helper.run_cmd( + f"virtualenv {self.venv_args} {env_path}", timeout=300.) + except Exception: + self.log_exc(f"Error creating virtualenv") + return + if not self.executable.exists(): + raise self.log_exc("Failed to create new virtualenv", False) diff --git a/moonraker/components/update_manager/base_deploy.py b/moonraker/components/update_manager/base_deploy.py new file mode 100644 index 0000000..1ca993d --- /dev/null +++ b/moonraker/components/update_manager/base_deploy.py @@ -0,0 +1,56 @@ +# Base Deployment Interface +# +# Copyright (C) 2021 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import logging + +from typing import TYPE_CHECKING, Dict, Any +if TYPE_CHECKING: + from confighelper import ConfigHelper + from utils import ServerError + from .update_manager import CommandHelper + +class BaseDeploy: + def __init__(self, + config: ConfigHelper, + cmd_helper: CommandHelper + ) -> None: + name_parts = config.get_name().split() + self.name = name_parts[-1] + self.server = config.get_server() + self.cmd_helper = cmd_helper + if name_parts == 1: + self.prefix: str = "" + if config.get('type', "") == "web": + self.prefix = f"Web Client {self.name}: " + else: + self.prefix = f"Application {self.name}: " + + async def refresh(self) -> None: + pass + + async def update(self) -> None: + pass + + def get_update_status(self) -> Dict[str, Any]: + return {} + + def log_exc(self, msg: str, traceback: bool = True) -> ServerError: + log_msg = f"{self.prefix}{msg}" + if traceback: + logging.exception(log_msg) + else: + logging.info(log_msg) + return self.server.error(msg) + + def log_info(self, msg: str) -> None: + log_msg = f"{self.prefix}{msg}" + logging.info(log_msg) + + def notify_status(self, msg: str, is_complete: bool = False) -> None: + log_msg = f"{self.prefix}{msg}" + logging.debug(log_msg) + self.cmd_helper.notify_update_response(log_msg, is_complete) diff --git a/moonraker/components/update_manager/git_deploy.py b/moonraker/components/update_manager/git_deploy.py new file mode 100644 index 0000000..9003bed --- /dev/null +++ b/moonraker/components/update_manager/git_deploy.py @@ -0,0 +1,733 @@ +# Git Deployment implementation +# +# Copyright (C) 2021 Eric Callahan +# +# This file may be distributed under the terms of the GNU GPLv3 license. + +from __future__ import annotations +import os +import pathlib +import shutil +import re +import logging +import tornado +from concurrent.futures import ThreadPoolExecutor +from tornado.locks import Condition, Lock +from tornado.ioloop import IOLoop +from .app_deploy import AppDeploy + +# Annotation imports +from typing import ( + TYPE_CHECKING, + Any, + Tuple, + Optional, + Dict, + List, +) +if TYPE_CHECKING: + from confighelper import ConfigHelper + from components import database + from components import shell_command + from .update_manager import CommandHelper + DBComp = database.MoonrakerDatabase + + +class GitDeploy(AppDeploy): + def __init__(self, + config: ConfigHelper, + cmd_helper: CommandHelper, + app_params: Optional[Dict[str, Any]] = None + ) -> None: + super().__init__(config, cmd_helper, app_params) + self.repo = GitRepo(cmd_helper, self.path, self.name, self.origin) + if self.type != 'git_repo': + self.need_channel_update = True + + async def refresh(self) -> None: + try: + await self._update_repo_state() + except Exception: + logging.exception("Error Refreshing git state") + + async def _update_repo_state(self, need_fetch: bool = True) -> None: + self._is_valid = False + await self.repo.initialize(need_fetch=need_fetch) + invalids = self.repo.report_invalids(self.primary_branch) + if invalids: + msgs = '\n'.join(invalids) + self.log_info( + f"Repo validation checks failed:\n{msgs}") + if self.debug: + self._is_valid = True + self.log_info( + "Repo debug enabled, overriding validity checks") + else: + self.log_info("Updates on repo disabled") + else: + self._is_valid = True + self.log_info("Validity check for git repo passed") + + async def update(self) -> None: + await self.repo.wait_for_init() + if not self._is_valid: + raise self.log_exc("Update aborted, repo not valid", False) + if self.repo.is_dirty(): + raise self.log_exc( + "Update aborted, repo has been modified", False) + if self.repo.is_current(): + # No need to update + return + inst_hash = await self._get_file_hash(self.install_script) + pyreqs_hash = await self._get_file_hash(self.python_reqs) + npm_hash = await self._get_file_hash(self.npm_pkg_json) + await self._pull_repo() + # Check Semantic Versions + await self._update_dependencies(inst_hash, pyreqs_hash, npm_hash) + # Refresh local repo state + await self._update_repo_state(need_fetch=False) + await self.restart_service() + self.notify_status("Update Finished...", is_complete=True) + + async def recover(self, + hard: bool = False, + force_dep_update: bool = False + ) -> None: + self.notify_status("Attempting Repo Recovery...") + inst_hash = await self._get_file_hash(self.install_script) + pyreqs_hash = await self._get_file_hash(self.python_reqs) + npm_hash = await self._get_file_hash(self.npm_pkg_json) + + if hard: + await self.repo.clone() + await self._update_repo_state() + else: + self.notify_status("Resetting Git Repo...") + await self.repo.reset() + await self._update_repo_state() + + if self.repo.is_dirty() or not self._is_valid: + raise self.server.error( + "Recovery attempt failed, repo state not pristine", 500) + await self._update_dependencies(inst_hash, pyreqs_hash, npm_hash, + force=force_dep_update) + await self.restart_service() + self.notify_status("Reinstall Complete", is_complete=True) + + async def reinstall(self): + await self.recover(True, True) + + def get_update_status(self) -> Dict[str, Any]: + status = super().get_update_status() + status.update(self.repo.get_repo_status()) + return status + + async def _pull_repo(self) -> None: + self.notify_status("Updating Repo...") + try: + if self.repo.is_detached(): + await self.repo.fetch() + await self.repo.checkout() + else: + await self.repo.pull() + except Exception: + raise self.log_exc("Error running 'git pull'") + + async def _update_dependencies(self, + inst_hash: Optional[str], + pyreqs_hash: Optional[str], + npm_hash: Optional[str], + force: bool = False + ) -> None: + ret = await self._check_need_update(inst_hash, self.install_script) + if force or ret: + package_list = await self._parse_install_script() + if package_list is not None: + await self._install_packages(package_list) + ret = await self._check_need_update(pyreqs_hash, self.python_reqs) + if force or ret: + if self.python_reqs is not None: + await self._update_virtualenv(self.python_reqs) + ret = await self._check_need_update(npm_hash, self.npm_pkg_json) + if force or ret: + if self.npm_pkg_json is not None: + self.notify_status("Updating Node Packages...") + try: + await self.cmd_helper.run_cmd( + "npm ci --only=prod", notify=True, timeout=600., + cwd=str(self.path)) + except Exception: + self.notify_status("Node Package Update failed") + + async def _parse_install_script(self) -> Optional[List[str]]: + if self.install_script is None: + return None + # Open install file file and read + inst_path: pathlib.Path = self.install_script + if not inst_path.is_file(): + self.log_info(f"Unable to open install script: {inst_path}") + return None + with ThreadPoolExecutor(max_workers=1) as tpe: + data = await IOLoop.current().run_in_executor( + tpe, inst_path.read_text) + packages: List[str] = re.findall(r'PKGLIST="(.*)"', data) + packages = [p.lstrip("${PKGLIST}").strip() for p in packages] + if not packages: + self.log_info(f"No packages found in script: {inst_path}") + return None + logging.debug(f"Repo {self.name}: Detected Packages: {repr(packages)}") + return packages + + +GIT_ASYNC_TIMEOUT = 300. +GIT_ENV_VARS = { + 'GIT_HTTP_LOW_SPEED_LIMIT': "1000", + 'GIT_HTTP_LOW_SPEED_TIME ': "20" +} +GIT_MAX_LOG_CNT = 100 +GIT_LOG_FMT = \ + "\"sha:%H%x1Dauthor:%an%x1Ddate:%ct%x1Dsubject:%s%x1Dmessage:%b%x1E\"" +GIT_OBJ_ERR = "fatal: loose object" + +class GitRepo: + def __init__(self, + cmd_helper: CommandHelper, + git_path: pathlib.Path, + alias: str, + origin_url: str + ) -> None: + self.server = cmd_helper.get_server() + self.cmd_helper = cmd_helper + self.alias = alias + self.git_path = git_path + git_dir = git_path.parent + git_base = git_path.name + self.backup_path = git_dir.joinpath(f".{git_base}_repo_backup") + self.origin_url = origin_url + self.valid_git_repo: bool = False + self.git_owner: str = "?" + self.git_remote: str = "?" + self.git_branch: str = "?" + self.current_version: str = "?" + self.upstream_version: str = "?" + self.current_commit: str = "?" + self.upstream_commit: str = "?" + self.upstream_url: str = "?" + self.full_version_string: str = "?" + self.branches: List[str] = [] + self.dirty: bool = False + self.head_detached: bool = False + self.git_messages: List[str] = [] + self.commits_behind: List[Dict[str, Any]] = [] + self.recovery_message = \ + f""" + Manually restore via SSH with the following commands: + sudo service {self.alias} stop + cd {git_dir} + rm -rf {git_base} + git clone {self.origin_url} + sudo service {self.alias} start + """ + + self.init_condition: Optional[Condition] = None + self.initialized: bool = False + self.git_operation_lock = Lock() + self.fetch_timeout_handle: Optional[object] = None + self.fetch_input_recd: bool = False + + async def initialize(self, need_fetch: bool = True) -> None: + if self.init_condition is not None: + # No need to initialize multiple requests + await self.init_condition.wait() + if self.initialized: + return + self.initialized = False + self.init_condition = Condition() + self.git_messages.clear() + try: + await self.update_repo_status() + self._verify_repo() + if not self.head_detached: + # lookup remote via git config + self.git_remote = await self.get_config_item( + f"branch.{self.git_branch}.remote") + + # Populate list of current branches + blist = await self.list_branches() + self.branches = [] + for branch in blist: + branch = branch.strip() + if branch[0] == "*": + branch = branch[2:] + if branch[0] == "(": + continue + self.branches.append(branch) + + if need_fetch: + await self.fetch() + + self.upstream_url = await self.remote(f"get-url {self.git_remote}") + self.current_commit = await self.rev_parse("HEAD") + self.upstream_commit = await self.rev_parse( + f"{self.git_remote}/{self.git_branch}") + current_version = await self.describe( + "--always --tags --long --dirty") + self.full_version_string = current_version.strip() + upstream_version = await self.describe( + f"{self.git_remote}/{self.git_branch} " + "--always --tags --long") + + # Store current remote in the database if in a detached state + if self.head_detached: + mrdb: DBComp = self.server.lookup_component("database") + db_key = f"update_manager.git_repo_{self.alias}" \ + ".detached_remote" + mrdb.insert_item( + "moonraker", db_key, + [self.current_commit, self.git_remote, self.git_branch]) + + # Parse GitHub Owner from URL + owner_match = re.match(r"https?://[^/]+/([^/]+)", self.upstream_url) + self.git_owner = "?" + if owner_match is not None: + self.git_owner = owner_match.group(1) + self.dirty = current_version.endswith("dirty") + + # Parse Version Info + versions = [] + for ver in [current_version, upstream_version]: + tag_version = "?" + ver_match = re.match(r"v\d+\.\d+\.\d-\d+", ver) + if ver_match: + tag_version = ver_match.group() + versions.append(tag_version) + self.current_version, self.upstream_version = versions + + # Get Commits Behind + self.commits_behind = [] + cbh = await self.get_commits_behind() + if cbh: + tagged_commits = await self.get_tagged_commits() + debug_msg = '\n'.join([f"{k}: {v}" for k, v in + tagged_commits.items()]) + logging.debug(f"Git Repo {self.alias}: Tagged Commits\n" + f"{debug_msg}") + for i, commit in enumerate(cbh): + tag = tagged_commits.get(commit['sha'], None) + if i < 30 or tag is not None: + commit['tag'] = tag + self.commits_behind.append(commit) + + self.log_repo_info() + except Exception: + logging.exception(f"Git Repo {self.alias}: Initialization failure") + raise + else: + self.initialized = True + finally: + self.init_condition.notify_all() + self.init_condition = None + + async def wait_for_init(self) -> None: + if self.init_condition is not None: + await self.init_condition.wait() + if not self.initialized: + raise self.server.error( + f"Git Repo {self.alias}: Initialization failure") + + async def update_repo_status(self) -> bool: + async with self.git_operation_lock: + if not self.git_path.joinpath(".git").is_dir(): + logging.info( + f"Git Repo {self.alias}: path '{self.git_path}'" + " is not a valid git repo") + return False + await self._wait_for_lock_release() + self.valid_git_repo = False + retries = 3 + while retries: + self.git_messages.clear() + try: + resp: Optional[str] = await self._run_git_cmd( + "status -u no", retries=1) + except Exception: + retries -= 1 + resp = None + # Attempt to recover from "loose object" error + if retries and GIT_OBJ_ERR in "\n".join(self.git_messages): + ret = await self._repair_loose_objects() + if not ret: + # Since we are unable to recover, immediately + # return + return False + else: + break + if resp is None: + return False + resp = resp.strip().split('\n', 1)[0] + self.head_detached = resp.startswith("HEAD detached") + branch_info = resp.split()[-1] + if self.head_detached: + bparts = branch_info.split("/", 1) + if len(bparts) == 2: + self.git_remote, self.git_branch = bparts + else: + mrdb: DBComp = self.server.lookup_component("database") + db_key = f"update_manager.git_repo_{self.alias}" \ + ".detached_remote" + detached_remote: List[str] = mrdb.get_item( + "moonraker", db_key, ["", "?", "?"]) + if detached_remote[0].startswith(branch_info): + self.git_remote = detached_remote[1] + self.git_branch = detached_remote[2] + msg = "Using remote stored in database:"\ + f" {self.git_remote}/{self.git_branch}" + elif self.git_remote == "?": + msg = "Resolve by manually checking out" \ + " a branch via SSH." + else: + msg = "Defaulting to previously tracked " \ + f"{self.git_remote}/{self.git_branch}." + logging.info( + f"Git Repo {self.alias}: HEAD detached on untracked " + f"commit {branch_info}. {msg}") + else: + self.git_branch = branch_info + self.valid_git_repo = True + return True + + def log_repo_info(self) -> None: + logging.info( + f"Git Repo {self.alias} Detected:\n" + f"Owner: {self.git_owner}\n" + f"Path: {self.git_path}\n" + f"Remote: {self.git_remote}\n" + f"Branch: {self.git_branch}\n" + f"Remote URL: {self.upstream_url}\n" + f"Current Commit SHA: {self.current_commit}\n" + f"Upstream Commit SHA: {self.upstream_commit}\n" + f"Current Version: {self.current_version}\n" + f"Upstream Version: {self.upstream_version}\n" + f"Is Dirty: {self.dirty}\n" + f"Is Detached: {self.head_detached}\n" + f"Commits Behind: {len(self.commits_behind)}") + + def report_invalids(self, primary_branch: str) -> List[str]: + invalids: List[str] = [] + upstream_url = self.upstream_url.lower() + if upstream_url[-4:] != ".git": + upstream_url += ".git" + if upstream_url != self.origin_url: + invalids.append(f"Unofficial remote url: {self.upstream_url}") + if self.git_branch != primary_branch or self.git_remote != "origin": + invalids.append( + "Repo not on valid remote branch, expected: " + f"origin/{primary_branch}, detected: " + f"{self.git_remote}/{self.git_branch}") + if self.head_detached: + invalids.append("Detached HEAD detected") + return invalids + + def _verify_repo(self, check_remote: bool = False) -> None: + if not self.valid_git_repo: + raise self.server.error( + f"Git Repo {self.alias}: repo not initialized") + if check_remote: + if self.git_remote == "?": + raise self.server.error( + f"Git Repo {self.alias}: No valid git remote detected") + + async def reset(self) -> None: + if self.git_remote == "?" or self.git_branch == "?": + raise self.server.error("Cannot reset, unknown remote/branch") + async with self.git_operation_lock: + await self._run_git_cmd("clean -d -f", retries=2) + await self._run_git_cmd( + f"reset --hard {self.git_remote}/{self.git_branch}", + retries=2) + + async def fetch(self) -> None: + self._verify_repo(check_remote=True) + async with self.git_operation_lock: + await self._run_git_cmd_async( + f"fetch {self.git_remote} --prune --progress") + + + async def pull(self) -> None: + self._verify_repo() + if self.head_detached: + raise self.server.error( + f"Git Repo {self.alias}: Cannot perform pull on a " + "detached HEAD") + async with self.git_operation_lock: + await self._run_git_cmd_async("pull --progress") + + async def list_branches(self) -> List[str]: + self._verify_repo() + async with self.git_operation_lock: + resp = await self._run_git_cmd("branch --list") + return resp.strip().split("\n") + + async def remote(self, command: str) -> str: + self._verify_repo(check_remote=True) + async with self.git_operation_lock: + resp = await self._run_git_cmd( + f"remote {command}") + return resp.strip() + + async def describe(self, args: str = "") -> str: + self._verify_repo() + async with self.git_operation_lock: + resp = await self._run_git_cmd(f"describe {args}".strip()) + return resp.strip() + + async def rev_parse(self, args: str = "") -> str: + self._verify_repo() + async with self.git_operation_lock: + resp = await self._run_git_cmd(f"rev-parse {args}".strip()) + return resp.strip() + + async def get_config_item(self, item: str) -> str: + self._verify_repo() + async with self.git_operation_lock: + resp = await self._run_git_cmd(f"config --get {item}") + return resp.strip() + + async def checkout(self, branch: Optional[str] = None) -> None: + self._verify_repo() + async with self.git_operation_lock: + branch = branch or f"{self.git_remote}/{self.git_branch}" + await self._run_git_cmd(f"checkout {branch} -q") + + async def run_fsck(self) -> None: + async with self.git_operation_lock: + await self._run_git_cmd("fsck --full", timeout=300., retries=1) + + async def clone(self) -> None: + async with self.git_operation_lock: + self.cmd_helper.notify_update_response( + f"Git Repo {self.alias}: Starting Clone Recovery...") + if self.backup_path.exists(): + shutil.rmtree(self.backup_path) + self._check_lock_file_exists(remove=True) + git_cmd = f"clone {self.origin_url} {self.backup_path}" + try: + await self._run_git_cmd_async(git_cmd, 1, False, False) + except Exception as e: + self.cmd_helper.notify_update_response( + f"Git Repo {self.alias}: Git Clone Failed") + raise self.server.error("Git Clone Error") from e + if self.git_path.exists(): + shutil.rmtree(self.git_path) + shutil.move(str(self.backup_path), str(self.git_path)) + self.cmd_helper.notify_update_response( + f"Git Repo {self.alias}: Git Clone Complete") + + async def get_commits_behind(self) -> List[Dict[str, Any]]: + self._verify_repo() + if self.is_current(): + return [] + async with self.git_operation_lock: + branch = f"{self.git_remote}/{self.git_branch}" + resp = await self._run_git_cmd( + f"log {self.current_commit}..{branch} " + f"--format={GIT_LOG_FMT} --max-count={GIT_MAX_LOG_CNT}") + commits_behind: List[Dict[str, Any]] = [] + for log_entry in resp.split('\x1E'): + log_entry = log_entry.strip() + if not log_entry: + continue + log_items = [li.strip() for li in log_entry.split('\x1D') + if li.strip()] + cbh = [li.split(':', 1) for li in log_items] + commits_behind.append(dict(cbh)) # type: ignore + return commits_behind + + async def get_tagged_commits(self) -> Dict[str, Any]: + self._verify_repo() + async with self.git_operation_lock: + resp = await self._run_git_cmd(f"show-ref --tags -d") + tagged_commits: Dict[str, Any] = {} + tags = [tag.strip() for tag in resp.split('\n') if tag.strip()] + for tag in tags: + sha, ref = tag.split(' ', 1) + ref = ref.split('/')[-1] + if ref[-3:] == "^{}": + # Dereference this commit and overwrite any existing tag + ref = ref[:-3] + tagged_commits[ref] = sha + elif ref not in tagged_commits: + # This could be a lightweight tag pointing to a commit. If + # it is an annotated tag it will be overwritten by the + # dereferenced tag + tagged_commits[ref] = sha + # Return tagged commits as SHA keys mapped to tag values + return {v: k for k, v in tagged_commits.items()} + + def get_repo_status(self) -> Dict[str, Any]: + return { + 'remote_alias': self.git_remote, + 'branch': self.git_branch, + 'owner': self.git_owner, + 'version': self.current_version, + 'remote_version': self.upstream_version, + 'current_hash': self.current_commit, + 'remote_hash': self.upstream_commit, + 'is_dirty': self.dirty, + 'detached': self.head_detached, + 'commits_behind': self.commits_behind, + 'git_messages': self.git_messages, + 'full_version_string': self.full_version_string + } + + def get_version(self, upstream: bool = False) -> Tuple[Any, ...]: + version = self.upstream_version if upstream else self.current_version + return tuple(re.findall(r"\d+", version)) + + def is_detached(self) -> bool: + return self.head_detached + + def is_dirty(self) -> bool: + return self.dirty + + def is_current(self) -> bool: + return self.current_commit == self.upstream_commit + + def _check_lock_file_exists(self, remove: bool = False) -> bool: + lock_path = self.git_path.joinpath(".git/index.lock") + if lock_path.is_file(): + if remove: + logging.info(f"Git Repo {self.alias}: Git lock file found " + "after git process exited, removing") + try: + os.remove(lock_path) + except Exception: + pass + return True + return False + + async def _wait_for_lock_release(self, timeout: int = 60) -> None: + while timeout: + if self._check_lock_file_exists(): + if not timeout % 10: + logging.info(f"Git Repo {self.alias}: Git lock file " + f"exists, {timeout} seconds remaining " + "before removal.") + await tornado.gen.sleep(1.) + timeout -= 1 + else: + return + self._check_lock_file_exists(remove=True) + + async def _repair_loose_objects(self) -> bool: + try: + await self.cmd_helper.run_cmd_with_response( + "find .git/objects/ -type f -empty | xargs rm", + timeout=10., retries=1, cwd=str(self.git_path)) + await self._run_git_cmd_async( + "fetch --all -p", retries=1, fix_loose=False) + await self._run_git_cmd("fsck --full", timeout=300., retries=1) + except Exception: + logging.exception("Attempt to repair loose objects failed") + return False + return True + + async def _run_git_cmd_async(self, + cmd: str, + retries: int = 5, + need_git_path: bool = True, + fix_loose: bool = True + ) -> None: + # Fetch and pull require special handling. If the request + # gets delayed we do not want to terminate it while the command + # is processing. + await self._wait_for_lock_release() + env = os.environ.copy() + env.update(GIT_ENV_VARS) + if need_git_path: + git_cmd = f"git -C {self.git_path} {cmd}" + else: + git_cmd = f"git {cmd}" + scmd = self.cmd_helper.build_shell_command( + git_cmd, callback=self._handle_process_output, + env=env) + while retries: + self.git_messages.clear() + ioloop = IOLoop.current() + self.fetch_input_recd = False + self.fetch_timeout_handle = ioloop.call_later( + GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore + scmd, cmd) + try: + await scmd.run(timeout=0) + except Exception: + pass + ioloop.remove_timeout(self.fetch_timeout_handle) + ret = scmd.get_return_code() + if ret == 0: + self.git_messages.clear() + return + elif fix_loose: + if GIT_OBJ_ERR in "\n".join(self.git_messages): + ret = await self._repair_loose_objects() + if ret: + break + # since the attept to repair failed, bypass retries + # and immediately raise an exception + raise self.server.error( + f"Unable to repair loose objects, use hard recovery") + retries -= 1 + await tornado.gen.sleep(.5) + self._check_lock_file_exists(remove=True) + raise self.server.error(f"Git Command '{cmd}' failed") + + def _handle_process_output(self, output: bytes) -> None: + self.fetch_input_recd = True + out = output.decode().strip() + if out: + self.git_messages.append(out) + self.cmd_helper.notify_update_response(out) + logging.debug( + f"Git Repo {self.alias}: {out}") + + async def _check_process_active(self, + scmd: shell_command.ShellCommand, + cmd_name: str + ) -> None: + ret = scmd.get_return_code() + if ret is not None: + logging.debug(f"Git Repo {self.alias}: {cmd_name} returned") + return + if self.fetch_input_recd: + # Received some input, reschedule timeout + logging.debug( + f"Git Repo {self.alias}: {cmd_name} active, rescheduling") + ioloop = IOLoop.current() + self.fetch_input_recd = False + self.fetch_timeout_handle = ioloop.call_later( + GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore + scmd, cmd_name) + else: + # Request has timed out with no input, terminate it + logging.debug(f"Git Repo {self.alias}: {cmd_name} timed out") + # Cancel with SIGKILL + await scmd.cancel(2) + + async def _run_git_cmd(self, + git_args: str, + timeout: float = 20., + retries: int = 5, + env: Optional[Dict[str, str]] = None + ) -> str: + try: + return await self.cmd_helper.run_cmd_with_response( + f"git -C {self.git_path} {git_args}", + timeout=timeout, retries=retries, env=env, sig_idx=2) + except self.cmd_helper.scmd_error as e: + stdout = e.stdout.decode().strip() + stderr = e.stderr.decode().strip() + if stdout: + self.git_messages.append(stdout) + if stderr: + self.git_messages.append(stderr) + raise diff --git a/moonraker/components/update_manager/update_manager.py b/moonraker/components/update_manager/update_manager.py index 6ad3f20..9528143 100644 --- a/moonraker/components/update_manager/update_manager.py +++ b/moonraker/components/update_manager/update_manager.py @@ -6,7 +6,6 @@ from __future__ import annotations import os -import re import logging import json import sys @@ -15,29 +14,29 @@ import zipfile import io import time import tempfile -import hashlib import tornado.gen from tornado.ioloop import IOLoop, PeriodicCallback from tornado.httpclient import AsyncHTTPClient from tornado.locks import Event, Condition, Lock +from .base_deploy import BaseDeploy +from .app_deploy import AppDeploy +from .git_deploy import GitDeploy # Annotation imports from typing import ( TYPE_CHECKING, Any, - Tuple, Optional, + Type, Union, Dict, List, - Coroutine, ) if TYPE_CHECKING: from tornado.httpclient import HTTPResponse from moonraker import Server from confighelper import ConfigHelper from websockets import WebRequest - from utils import ServerError from components import klippy_apis from components import shell_command from components import database @@ -49,6 +48,8 @@ MOONRAKER_PATH = os.path.normpath(os.path.join( os.path.dirname(__file__), "../../..")) SUPPLEMENTAL_CFG_PATH = os.path.join( os.path.dirname(__file__), "update_manager.conf") +KLIPPER_DEFAULT_PATH = os.path.expanduser("~/klipper") +KLIPPER_DEFAULT_EXEC = os.path.expanduser("~/klippy-env/bin/python") APT_CMD = "sudo DEBIAN_FRONTEND=noninteractive apt-get" # Check To see if Updates are necessary each hour @@ -58,22 +59,47 @@ MIN_REFRESH_TIME = 43200 # Perform auto refresh no later than 4am MAX_PKG_UPDATE_HOUR = 4 +def get_deploy_class(app_path: str) -> Type: + if AppDeploy._is_git_repo(app_path): + return GitDeploy + else: + # TODO: This will be Zip deploy after implementation + return GitDeploy + class UpdateManager: def __init__(self, config: ConfigHelper) -> None: self.server = config.get_server() - self.config = config - self.config.read_supplemental_config(SUPPLEMENTAL_CFG_PATH) + self.app_config = config.read_supplemental_config( + SUPPLEMENTAL_CFG_PATH) auto_refresh_enabled = config.getboolean('enable_auto_refresh', False) enable_sys_updates = config.get('enable_system_updates', True) + self.channel = config.get('channel', "dev") self.cmd_helper = CommandHelper(config) - env = sys.executable - mooncfg = self.config[f"update_manager moonraker"] - self.updaters: Dict[str, BaseUpdater] = { - "moonraker": GitUpdater(mooncfg, self.cmd_helper, - MOONRAKER_PATH, env) - } + self.updaters: Dict[str, BaseDeploy] = {} if enable_sys_updates: - self.updaters['system'] = PackageUpdater(config, self.cmd_helper) + self.updaters['system'] = PackageDeploy(config, self.cmd_helper) + if ( + os.path.exists(KLIPPER_DEFAULT_PATH) and + os.path.exists(KLIPPER_DEFAULT_EXEC) + ): + self.updaters['klipper'] = get_deploy_class(KLIPPER_DEFAULT_PATH)( + self.app_config[f"update_manager klipper"], self.cmd_helper, + { + 'channel': self.channel, + 'path': KLIPPER_DEFAULT_PATH, + 'executable': KLIPPER_DEFAULT_EXEC + }) + else: + self.updaters['klipper'] = BaseDeploy( + self.app_config[f"update_manager klipper"], self.cmd_helper) + self.updaters['moonraker'] = get_deploy_class(MOONRAKER_PATH)( + self.app_config[f"update_manager moonraker"], self.cmd_helper, + { + 'channel': self.channel, + 'path': MOONRAKER_PATH, + 'executable': sys.executable + }) + # TODO: The below check may be removed when invalid config options # raise a config error. if config.get("client_repo", None) is not None or \ @@ -82,18 +108,18 @@ class UpdateManager: "The deprecated 'client_repo' and 'client_path' options\n" "have been removed. See Moonraker's configuration docs\n" "for details on client configuration.") - client_sections = self.config.get_prefix_sections("update_manager") + client_sections = config.get_prefix_sections("update_manager ") for section in client_sections: - cfg = self.config[section] + cfg = config[section] name = section.split()[-1] if name in self.updaters: raise config.error("Client repo named %s already added" % (name,)) client_type = cfg.get("type") if client_type == "git_repo": - self.updaters[name] = GitUpdater(cfg, self.cmd_helper) + self.updaters[name] = GitDeploy(cfg, self.cmd_helper) elif client_type == "web": - self.updaters[name] = WebUpdater(cfg, self.cmd_helper) + self.updaters[name] = WebClientDeploy(cfg, self.cmd_helper) else: raise config.error("Invalid type '%s' for section [%s]" % (client_type, section)) @@ -140,13 +166,13 @@ class UpdateManager: self._initalize_updaters, list(self.updaters.values())) async def _initalize_updaters(self, - initial_updaters: List[BaseUpdater] + initial_updaters: List[BaseDeploy] ) -> None: async with self.cmd_request_lock: self.is_refreshing = True await self.cmd_helper.init_api_rate_limit() for updater in initial_updaters: - if isinstance(updater, PackageUpdater): + if isinstance(updater, PackageDeploy): ret = updater.refresh(False) else: ret = updater.refresh() @@ -160,17 +186,22 @@ class UpdateManager: logging.info("No valid klippy info received") return kpath: str = kinfo['klipper_path'] - env: str = kinfo['python_path'] - kupdater = self.updaters.get('klipper', None) - if kupdater is not None: - assert isinstance(kupdater, GitUpdater) - if kupdater.repo_path == kpath and \ - kupdater.env == env: - # Current Klipper Updater is valid - return - kcfg = self.config[f"update_manager klipper"] - need_notification = "klipper" not in self.updaters - self.updaters['klipper'] = GitUpdater(kcfg, self.cmd_helper, kpath, env) + executable: str = kinfo['python_path'] + kupdater = self.updaters.get('klipper') + if ( + isinstance(kupdater, AppDeploy) and + kupdater.check_same_paths(kpath, executable) + ): + # Current Klipper Updater is valid + return + need_notification = not isinstance(kupdater, AppDeploy) + self.updaters['klipper'] = get_deploy_class(kpath)( + self.app_config[f"update_manager klipper"], self.cmd_helper, + { + 'channel': self.channel, + 'path': kpath, + 'executable': executable + }) async with self.cmd_request_lock: await self.updaters['klipper'].refresh() if need_notification: @@ -298,7 +329,7 @@ class UpdateManager: updater = self.updaters.get(app, None) if updater is None: raise self.server.error(f"Updater {app} not available", 404) - elif not isinstance(updater, GitUpdater): + elif not isinstance(updater, GitDeploy): raise self.server.error(f"Upater {app} is not a Git Repo Type") async with self.cmd_request_lock: self.cmd_helper.set_update_info(f"recover_{app}", id(web_request)) @@ -534,876 +565,7 @@ class CommandHelper: def close(self) -> None: self.http_client.close() -class BaseUpdater: - def __init__(self, - config: ConfigHelper, - cmd_helper: CommandHelper - ) -> None: - self.server = config.get_server() - self.cmd_helper = cmd_helper - - def refresh(self) -> Coroutine: - raise NotImplementedError - - def update(self) -> Coroutine: - raise NotImplementedError - - def get_update_status(self) -> Dict[str, Any]: - raise NotImplementedError - -class GitUpdater(BaseUpdater): - def __init__(self, - config: ConfigHelper, - cmd_helper: CommandHelper, - path: Optional[str] = None, - env: Optional[str] = None - ) -> None: - super().__init__(config, cmd_helper) - self.name = config.get_name().split()[-1] - self.is_valid: bool = False - if path is None: - path = os.path.expanduser(config.get('path')) - self.primary_branch = config.get("primary_branch", "master") - self.repo_path: str = path - origin: str = config.get("origin").lower() - self.repo = GitRepo(cmd_helper, path, self.name, origin) - self.debug = self.cmd_helper.is_debug_enabled() - self.env = config.get("env", env) - self.npm_pkg_json: Optional[str] = None - if config.get("enable_node_updates", False): - self.npm_pkg_json = os.path.join( - self.repo_path, "package-lock.json") - if not os.path.isfile(self.npm_pkg_json): - raise config.error( - f"Cannot enable node updates, no file " - f"{self.npm_pkg_json}") - self.python_reqs: Optional[str] = None - if self.env is not None: - self.env = os.path.expanduser(self.env) - self.python_reqs = os.path.join( - self.repo_path, config.get("requirements")) - self.install_script = config.get('install_script', None) - if self.install_script is not None: - self.install_script = os.path.abspath(os.path.join( - self.repo_path, self.install_script)) - self.venv_args: Optional[str] = config.get('venv_args', None) - for opt in ["repo_path", "env", "python_reqs", - "install_script"]: - val = getattr(self, opt) - if val is None: - continue - if not os.path.exists(val): - raise config.error("Invalid path for option '%s': %s" - % (val, opt)) - - def _get_version_info(self) -> Dict[str, Any]: - ver_path = os.path.join(self.repo_path, "scripts/version.txt") - vinfo: Dict[str, Any] = {} - if os.path.isfile(ver_path): - data = "" - with open(ver_path, 'r') as f: - data = f.read() - try: - entries = [e.strip() for e in data.split('\n') if e.strip()] - vinfo = dict([i.split('=') for i in entries]) # type: ignore - vinfo = {k: tuple(re.findall(r"\d+", v)) for k, v in - vinfo.items()} - except Exception: - pass - else: - self._log_info(f"Version Info Found: {vinfo}") - vinfo['version'] = self.repo.get_version() - return vinfo - - def _log_exc(self, msg: str, traceback: bool = True) -> ServerError: - log_msg = f"Repo {self.name}: {msg}" - if traceback: - logging.exception(log_msg) - else: - logging.info(log_msg) - return self.server.error(msg) - - def _log_info(self, msg: str) -> None: - log_msg = f"Repo {self.name}: {msg}" - logging.info(log_msg) - - def _notify_status(self, msg: str, is_complete: bool = False) -> None: - log_msg = f"Git Repo {self.name}: {msg}" - logging.debug(log_msg) - self.cmd_helper.notify_update_response(log_msg, is_complete) - - async def refresh(self) -> None: - try: - await self._update_repo_state() - except Exception: - logging.exception("Error Refreshing git state") - - async def _update_repo_state(self, need_fetch: bool = True) -> None: - self.is_valid = False - await self.repo.initialize(need_fetch=need_fetch) - invalids = self.repo.report_invalids(self.primary_branch) - if invalids: - msgs = '\n'.join(invalids) - self._log_info( - f"Repo validation checks failed:\n{msgs}") - if self.debug: - self.is_valid = True - self._log_info( - "Repo debug enabled, overriding validity checks") - else: - self._log_info("Updates on repo disabled") - else: - self.is_valid = True - self._log_info("Validity check for git repo passed") - - async def update(self) -> None: - await self.repo.wait_for_init() - if not self.is_valid: - raise self._log_exc("Update aborted, repo not valid", False) - if self.repo.is_dirty(): - raise self._log_exc( - "Update aborted, repo has been modified", False) - if self.repo.is_current(): - # No need to update - return - inst_hash = self._get_file_hash(self.install_script) - pyreqs_hash = self._get_file_hash(self.python_reqs) - npm_hash = self._get_file_hash(self.npm_pkg_json) - await self._pull_repo() - # Check Semantic Versions - await self._update_dependencies(inst_hash, pyreqs_hash, npm_hash) - # Refresh local repo state - await self._update_repo_state(need_fetch=False) - if self.name == "moonraker": - # Launch restart async so the request can return - # before the server restarts - self._notify_status("Update Finished...", - is_complete=True) - IOLoop.current().call_later( - .1, self.restart_service) # type: ignore - else: - await self.restart_service() - self._notify_status("Update Finished...", is_complete=True) - - async def _pull_repo(self) -> None: - self._notify_status("Updating Repo...") - try: - if self.repo.is_detached(): - await self.repo.fetch() - await self.repo.checkout() - else: - await self.repo.pull() - except Exception: - raise self._log_exc("Error running 'git pull'") - - async def _update_dependencies(self, - inst_hash: Optional[str], - pyreqs_hash: Optional[str], - npm_hash: Optional[str], - force: bool = False - ) -> None: - if force or self._check_need_update(inst_hash, self.install_script): - await self._install_packages() - if force or self._check_need_update(pyreqs_hash, self.python_reqs): - await self._update_virtualenv() - if force or self._check_need_update(npm_hash, self.npm_pkg_json): - if self.npm_pkg_json is not None: - self._notify_status("Updating Node Packages...") - try: - await self.cmd_helper.run_cmd( - "npm ci --only=prod", notify=True, timeout=600., - cwd=self.repo_path) - except Exception: - self._notify_status("Node Package Update failed") - - def _get_file_hash(self, filename: Optional[str]) -> Optional[str]: - if filename is None or not os.path.isfile(filename): - return None - try: - with open(filename, "rb") as f: - data = f.read() - except Exception: - return None - return hashlib.sha256(data).hexdigest() - - def _check_need_update(self, - prev_hash: Optional[str], - filename: Optional[str] - ) -> bool: - cur_hash = self._get_file_hash(filename) - if prev_hash is None or cur_hash is None: - return False - return prev_hash != cur_hash - - async def _install_packages(self) -> None: - if self.install_script is None: - return - # Open install file file and read - inst_path: str = self.install_script - if not os.path.isfile(inst_path): - self._log_info(f"Unable to open install script: {inst_path}") - return - with open(inst_path, 'r') as f: - data = f.read() - packages: List[str] = re.findall(r'PKGLIST="(.*)"', data) - packages = [p.lstrip("${PKGLIST}").strip() for p in packages] - if not packages: - self._log_info(f"No packages found in script: {inst_path}") - return - pkgs = " ".join(packages) - logging.debug(f"Repo {self.name}: Detected Packages: {pkgs}") - self._notify_status("Installing system dependencies...") - # Install packages with apt-get - try: - await self.cmd_helper.run_cmd( - f"{APT_CMD} update", timeout=300., notify=True) - await self.cmd_helper.run_cmd( - f"{APT_CMD} install --yes {pkgs}", timeout=3600., - notify=True) - except Exception: - self._log_exc("Error updating packages via apt-get") - return - - async def _update_virtualenv(self) -> None: - if self.env is None: - return - # Update python dependencies - bin_dir = os.path.dirname(self.env) - reqs = self.python_reqs - if reqs is None or not os.path.isfile(reqs): - self._log_exc(f"Invalid path to requirements_file '{reqs}'") - return - pip = os.path.join(bin_dir, "pip") - self._notify_status("Updating python packages...") - try: - # First attempt to update pip - await self.cmd_helper.run_cmd( - f"{pip} install -U pip", timeout=1200., notify=True, - retries=3) - await self.cmd_helper.run_cmd( - f"{pip} install -r {reqs}", timeout=1200., notify=True, - retries=3) - except Exception: - self._log_exc("Error updating python requirements") - - async def _build_virtualenv(self) -> None: - if self.env is None: - return - bin_dir = os.path.dirname(self.env) - env_path = os.path.normpath(os.path.join(bin_dir, "..")) - self._notify_status(f"Creating virtualenv at: {env_path}...") - if os.path.exists(env_path): - shutil.rmtree(env_path) - try: - await self.cmd_helper.run_cmd( - f"virtualenv {self.venv_args} {env_path}", timeout=300.) - except Exception: - self._log_exc(f"Error creating virtualenv") - return - if not os.path.exists(self.env): - raise self._log_exc("Failed to create new virtualenv", False) - - async def restart_service(self) -> None: - self._notify_status("Restarting Service...") - try: - await self.cmd_helper.run_cmd( - f"sudo systemctl restart {self.name}") - except Exception: - if self.name == "moonraker": - # We will always get an error when restarting moonraker - # from within the child process, so ignore it - return - raise self._log_exc("Error restarting service") - - async def recover(self, - hard: bool = False, - force_dep_update: bool = False - ) -> None: - self._notify_status("Attempting Repo Recovery...") - inst_hash = self._get_file_hash(self.install_script) - pyreqs_hash = self._get_file_hash(self.python_reqs) - npm_hash = self._get_file_hash(self.npm_pkg_json) - - if hard: - await self.repo.clone() - await self._update_repo_state() - else: - self._notify_status("Resetting Git Repo...") - await self.repo.reset() - await self._update_repo_state() - - if self.repo.is_dirty() or not self.is_valid: - raise self.server.error( - "Recovery attempt failed, repo state not pristine", 500) - await self._update_dependencies(inst_hash, pyreqs_hash, npm_hash, - force=force_dep_update) - if self.name == "moonraker": - IOLoop.current().call_later( - .1, self.restart_service) # type: ignore - else: - await self.restart_service() - self._notify_status("Recovery Complete", is_complete=True) - - def get_update_status(self) -> Dict[str, Any]: - status = self.repo.get_repo_status() - status['is_valid'] = self.is_valid - status['debug_enabled'] = self.debug - return status - - -GIT_ASYNC_TIMEOUT = 300. -GIT_ENV_VARS = { - 'GIT_HTTP_LOW_SPEED_LIMIT': "1000", - 'GIT_HTTP_LOW_SPEED_TIME ': "20" -} -GIT_MAX_LOG_CNT = 100 -GIT_LOG_FMT = \ - "\"sha:%H%x1Dauthor:%an%x1Ddate:%ct%x1Dsubject:%s%x1Dmessage:%b%x1E\"" -GIT_OBJ_ERR = "fatal: loose object" - -class GitRepo: - def __init__(self, - cmd_helper: CommandHelper, - git_path: str, - alias: str, - origin_url: str - ) -> None: - self.server = cmd_helper.get_server() - self.cmd_helper = cmd_helper - self.alias = alias - self.git_path = git_path - git_dir, git_base = os.path.split(self.git_path) - self.backup_path = os.path.join(git_dir, f".{git_base}_repo_backup") - self.origin_url = origin_url - self.valid_git_repo: bool = False - self.git_owner: str = "?" - self.git_remote: str = "?" - self.git_branch: str = "?" - self.current_version: str = "?" - self.upstream_version: str = "?" - self.current_commit: str = "?" - self.upstream_commit: str = "?" - self.upstream_url: str = "?" - self.full_version_string: str = "?" - self.branches: List[str] = [] - self.dirty: bool = False - self.head_detached: bool = False - self.git_messages: List[str] = [] - self.commits_behind: List[Dict[str, Any]] = [] - self.recovery_message = \ - f""" - Manually restore via SSH with the following commands: - sudo service {self.alias} stop - cd {git_dir} - rm -rf {git_base} - git clone {self.origin_url} - sudo service {self.alias} start - """ - - self.init_condition: Optional[Condition] = None - self.initialized: bool = False - self.git_operation_lock = Lock() - self.fetch_timeout_handle: Optional[object] = None - self.fetch_input_recd: bool = False - - async def initialize(self, need_fetch: bool = True) -> None: - if self.init_condition is not None: - # No need to initialize multiple requests - await self.init_condition.wait() - if self.initialized: - return - self.initialized = False - self.init_condition = Condition() - self.git_messages.clear() - try: - await self.update_repo_status() - self._verify_repo() - if not self.head_detached: - # lookup remote via git config - self.git_remote = await self.get_config_item( - f"branch.{self.git_branch}.remote") - - # Populate list of current branches - blist = await self.list_branches() - self.branches = [] - for branch in blist: - branch = branch.strip() - if branch[0] == "*": - branch = branch[2:] - if branch[0] == "(": - continue - self.branches.append(branch) - - if need_fetch: - await self.fetch() - - self.upstream_url = await self.remote(f"get-url {self.git_remote}") - self.current_commit = await self.rev_parse("HEAD") - self.upstream_commit = await self.rev_parse( - f"{self.git_remote}/{self.git_branch}") - current_version = await self.describe( - "--always --tags --long --dirty") - self.full_version_string = current_version.strip() - upstream_version = await self.describe( - f"{self.git_remote}/{self.git_branch} " - "--always --tags --long") - - # Store current remote in the database if in a detached state - if self.head_detached: - mrdb: DBComp = self.server.lookup_component("database") - db_key = f"update_manager.git_repo_{self.alias}" \ - ".detached_remote" - mrdb.insert_item( - "moonraker", db_key, - [self.current_commit, self.git_remote, self.git_branch]) - - # Parse GitHub Owner from URL - owner_match = re.match(r"https?://[^/]+/([^/]+)", self.upstream_url) - self.git_owner = "?" - if owner_match is not None: - self.git_owner = owner_match.group(1) - self.dirty = current_version.endswith("dirty") - - # Parse Version Info - versions = [] - for ver in [current_version, upstream_version]: - tag_version = "?" - ver_match = re.match(r"v\d+\.\d+\.\d-\d+", ver) - if ver_match: - tag_version = ver_match.group() - versions.append(tag_version) - self.current_version, self.upstream_version = versions - - # Get Commits Behind - self.commits_behind = [] - cbh = await self.get_commits_behind() - if cbh: - tagged_commits = await self.get_tagged_commits() - debug_msg = '\n'.join([f"{k}: {v}" for k, v in - tagged_commits.items()]) - logging.debug(f"Git Repo {self.alias}: Tagged Commits\n" - f"{debug_msg}") - for i, commit in enumerate(cbh): - tag = tagged_commits.get(commit['sha'], None) - if i < 30 or tag is not None: - commit['tag'] = tag - self.commits_behind.append(commit) - - self.log_repo_info() - except Exception: - logging.exception(f"Git Repo {self.alias}: Initialization failure") - raise - else: - self.initialized = True - finally: - self.init_condition.notify_all() - self.init_condition = None - - async def wait_for_init(self) -> None: - if self.init_condition is not None: - await self.init_condition.wait() - if not self.initialized: - raise self.server.error( - f"Git Repo {self.alias}: Initialization failure") - - async def update_repo_status(self) -> bool: - async with self.git_operation_lock: - if not os.path.isdir(os.path.join(self.git_path, ".git")): - logging.info( - f"Git Repo {self.alias}: path '{self.git_path}'" - " is not a valid git repo") - return False - await self._wait_for_lock_release() - self.valid_git_repo = False - retries = 3 - while retries: - self.git_messages.clear() - try: - resp: Optional[str] = await self._run_git_cmd( - "status -u no", retries=1) - except Exception: - retries -= 1 - resp = None - # Attempt to recover from "loose object" error - if retries and GIT_OBJ_ERR in "\n".join(self.git_messages): - ret = await self._repair_loose_objects() - if not ret: - # Since we are unable to recover, immediately - # return - return False - else: - break - if resp is None: - return False - resp = resp.strip().split('\n', 1)[0] - self.head_detached = resp.startswith("HEAD detached") - branch_info = resp.split()[-1] - if self.head_detached: - bparts = branch_info.split("/", 1) - if len(bparts) == 2: - self.git_remote, self.git_branch = bparts - else: - mrdb: DBComp = self.server.lookup_component("database") - db_key = f"update_manager.git_repo_{self.alias}" \ - ".detached_remote" - detached_remote: List[str] = mrdb.get_item( - "moonraker", db_key, ["", "?", "?"]) - if detached_remote[0].startswith(branch_info): - self.git_remote = detached_remote[1] - self.git_branch = detached_remote[2] - msg = "Using remote stored in database:"\ - f" {self.git_remote}/{self.git_branch}" - elif self.git_remote == "?": - msg = "Resolve by manually checking out" \ - " a branch via SSH." - else: - msg = "Defaulting to previously tracked " \ - f"{self.git_remote}/{self.git_branch}." - logging.info( - f"Git Repo {self.alias}: HEAD detached on untracked " - f"commit {branch_info}. {msg}") - else: - self.git_branch = branch_info - self.valid_git_repo = True - return True - - def log_repo_info(self) -> None: - logging.info( - f"Git Repo {self.alias} Detected:\n" - f"Owner: {self.git_owner}\n" - f"Path: {self.git_path}\n" - f"Remote: {self.git_remote}\n" - f"Branch: {self.git_branch}\n" - f"Remote URL: {self.upstream_url}\n" - f"Current Commit SHA: {self.current_commit}\n" - f"Upstream Commit SHA: {self.upstream_commit}\n" - f"Current Version: {self.current_version}\n" - f"Upstream Version: {self.upstream_version}\n" - f"Is Dirty: {self.dirty}\n" - f"Is Detached: {self.head_detached}\n" - f"Commits Behind: {len(self.commits_behind)}") - - def report_invalids(self, primary_branch: str) -> List[str]: - invalids: List[str] = [] - upstream_url = self.upstream_url.lower() - if upstream_url[-4:] != ".git": - upstream_url += ".git" - if upstream_url != self.origin_url: - invalids.append(f"Unofficial remote url: {self.upstream_url}") - if self.git_branch != primary_branch or self.git_remote != "origin": - invalids.append( - "Repo not on valid remote branch, expected: " - f"origin/{primary_branch}, detected: " - f"{self.git_remote}/{self.git_branch}") - if self.head_detached: - invalids.append("Detached HEAD detected") - return invalids - - def _verify_repo(self, check_remote: bool = False) -> None: - if not self.valid_git_repo: - raise self.server.error( - f"Git Repo {self.alias}: repo not initialized") - if check_remote: - if self.git_remote == "?": - raise self.server.error( - f"Git Repo {self.alias}: No valid git remote detected") - - async def reset(self) -> None: - if self.git_remote == "?" or self.git_branch == "?": - raise self.server.error("Cannot reset, unknown remote/branch") - async with self.git_operation_lock: - await self._run_git_cmd("clean -d -f", retries=2) - await self._run_git_cmd( - f"reset --hard {self.git_remote}/{self.git_branch}", - retries=2) - - async def fetch(self) -> None: - self._verify_repo(check_remote=True) - async with self.git_operation_lock: - await self._run_git_cmd_async( - f"fetch {self.git_remote} --prune --progress") - - - async def pull(self) -> None: - self._verify_repo() - if self.head_detached: - raise self.server.error( - f"Git Repo {self.alias}: Cannot perform pull on a " - "detached HEAD") - async with self.git_operation_lock: - await self._run_git_cmd_async("pull --progress") - - async def list_branches(self) -> List[str]: - self._verify_repo() - async with self.git_operation_lock: - resp = await self._run_git_cmd("branch --list") - return resp.strip().split("\n") - - async def remote(self, command: str) -> str: - self._verify_repo(check_remote=True) - async with self.git_operation_lock: - resp = await self._run_git_cmd( - f"remote {command}") - return resp.strip() - - async def describe(self, args: str = "") -> str: - self._verify_repo() - async with self.git_operation_lock: - resp = await self._run_git_cmd(f"describe {args}".strip()) - return resp.strip() - - async def rev_parse(self, args: str = "") -> str: - self._verify_repo() - async with self.git_operation_lock: - resp = await self._run_git_cmd(f"rev-parse {args}".strip()) - return resp.strip() - - async def get_config_item(self, item: str) -> str: - self._verify_repo() - async with self.git_operation_lock: - resp = await self._run_git_cmd(f"config --get {item}") - return resp.strip() - - async def checkout(self, branch: Optional[str] = None) -> None: - self._verify_repo() - async with self.git_operation_lock: - branch = branch or f"{self.git_remote}/{self.git_branch}" - await self._run_git_cmd(f"checkout {branch} -q") - - async def run_fsck(self) -> None: - async with self.git_operation_lock: - await self._run_git_cmd("fsck --full", timeout=300., retries=1) - - async def clone(self) -> None: - async with self.git_operation_lock: - self.cmd_helper.notify_update_response( - f"Git Repo {self.alias}: Starting Clone Recovery...") - if os.path.exists(self.backup_path): - shutil.rmtree(self.backup_path) - self._check_lock_file_exists(remove=True) - git_cmd = f"clone {self.origin_url} {self.backup_path}" - try: - await self._run_git_cmd_async(git_cmd, 1, False, False) - except Exception as e: - self.cmd_helper.notify_update_response( - f"Git Repo {self.alias}: Git Clone Failed") - raise self.server.error("Git Clone Error") from e - if os.path.exists(self.git_path): - shutil.rmtree(self.git_path) - shutil.move(self.backup_path, self.git_path) - self.cmd_helper.notify_update_response( - f"Git Repo {self.alias}: Git Clone Complete") - - async def get_commits_behind(self) -> List[Dict[str, Any]]: - self._verify_repo() - if self.is_current(): - return [] - async with self.git_operation_lock: - branch = f"{self.git_remote}/{self.git_branch}" - resp = await self._run_git_cmd( - f"log {self.current_commit}..{branch} " - f"--format={GIT_LOG_FMT} --max-count={GIT_MAX_LOG_CNT}") - commits_behind: List[Dict[str, Any]] = [] - for log_entry in resp.split('\x1E'): - log_entry = log_entry.strip() - if not log_entry: - continue - log_items = [li.strip() for li in log_entry.split('\x1D') - if li.strip()] - cbh = [li.split(':', 1) for li in log_items] - commits_behind.append(dict(cbh)) # type: ignore - return commits_behind - - async def get_tagged_commits(self) -> Dict[str, Any]: - self._verify_repo() - async with self.git_operation_lock: - resp = await self._run_git_cmd(f"show-ref --tags -d") - tagged_commits: Dict[str, Any] = {} - tags = [tag.strip() for tag in resp.split('\n') if tag.strip()] - for tag in tags: - sha, ref = tag.split(' ', 1) - ref = ref.split('/')[-1] - if ref[-3:] == "^{}": - # Dereference this commit and overwrite any existing tag - ref = ref[:-3] - tagged_commits[ref] = sha - elif ref not in tagged_commits: - # This could be a lightweight tag pointing to a commit. If - # it is an annotated tag it will be overwritten by the - # dereferenced tag - tagged_commits[ref] = sha - # Return tagged commits as SHA keys mapped to tag values - return {v: k for k, v in tagged_commits.items()} - - def get_repo_status(self) -> Dict[str, Any]: - return { - 'remote_alias': self.git_remote, - 'branch': self.git_branch, - 'owner': self.git_owner, - 'version': self.current_version, - 'remote_version': self.upstream_version, - 'current_hash': self.current_commit, - 'remote_hash': self.upstream_commit, - 'is_dirty': self.dirty, - 'detached': self.head_detached, - 'commits_behind': self.commits_behind, - 'git_messages': self.git_messages, - 'full_version_string': self.full_version_string - } - - def get_version(self, upstream: bool = False) -> Tuple[Any, ...]: - version = self.upstream_version if upstream else self.current_version - return tuple(re.findall(r"\d+", version)) - - def is_detached(self) -> bool: - return self.head_detached - - def is_dirty(self) -> bool: - return self.dirty - - def is_current(self) -> bool: - return self.current_commit == self.upstream_commit - - def _check_lock_file_exists(self, remove: bool = False) -> bool: - lock_path = os.path.join(self.git_path, ".git/index.lock") - if os.path.isfile(lock_path): - if remove: - logging.info(f"Git Repo {self.alias}: Git lock file found " - "after git process exited, removing") - try: - os.remove(lock_path) - except Exception: - pass - return True - return False - - async def _wait_for_lock_release(self, timeout: int = 60) -> None: - while timeout: - if self._check_lock_file_exists(): - if not timeout % 10: - logging.info(f"Git Repo {self.alias}: Git lock file " - f"exists, {timeout} seconds remaining " - "before removal.") - await tornado.gen.sleep(1.) - timeout -= 1 - else: - return - self._check_lock_file_exists(remove=True) - - async def _repair_loose_objects(self) -> bool: - try: - await self.cmd_helper.run_cmd_with_response( - "find .git/objects/ -type f -empty | xargs rm", - timeout=10., retries=1, cwd=self.git_path) - await self._run_git_cmd_async( - "fetch --all -p", retries=1, fix_loose=False) - await self._run_git_cmd("fsck --full", timeout=300., retries=1) - except Exception: - logging.exception("Attempt to repair loose objects failed") - return False - return True - - async def _run_git_cmd_async(self, - cmd: str, - retries: int = 5, - need_git_path: bool = True, - fix_loose: bool = True - ) -> None: - # Fetch and pull require special handling. If the request - # gets delayed we do not want to terminate it while the command - # is processing. - await self._wait_for_lock_release() - env = os.environ.copy() - env.update(GIT_ENV_VARS) - if need_git_path: - git_cmd = f"git -C {self.git_path} {cmd}" - else: - git_cmd = f"git {cmd}" - scmd = self.cmd_helper.build_shell_command( - git_cmd, callback=self._handle_process_output, - env=env) - while retries: - self.git_messages.clear() - ioloop = IOLoop.current() - self.fetch_input_recd = False - self.fetch_timeout_handle = ioloop.call_later( - GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore - scmd, cmd) - try: - await scmd.run(timeout=0) - except Exception: - pass - ioloop.remove_timeout(self.fetch_timeout_handle) - ret = scmd.get_return_code() - if ret == 0: - self.git_messages.clear() - return - elif fix_loose: - if GIT_OBJ_ERR in "\n".join(self.git_messages): - ret = await self._repair_loose_objects() - if ret: - break - # since the attept to repair failed, bypass retries - # and immediately raise an exception - raise self.server.error( - f"Unable to repair loose objects, use hard recovery") - retries -= 1 - await tornado.gen.sleep(.5) - self._check_lock_file_exists(remove=True) - raise self.server.error(f"Git Command '{cmd}' failed") - - def _handle_process_output(self, output: bytes) -> None: - self.fetch_input_recd = True - out = output.decode().strip() - if out: - self.git_messages.append(out) - self.cmd_helper.notify_update_response(out) - logging.debug( - f"Git Repo {self.alias}: {out}") - - async def _check_process_active(self, - scmd: shell_command.ShellCommand, - cmd_name: str - ) -> None: - ret = scmd.get_return_code() - if ret is not None: - logging.debug(f"Git Repo {self.alias}: {cmd_name} returned") - return - if self.fetch_input_recd: - # Received some input, reschedule timeout - logging.debug( - f"Git Repo {self.alias}: {cmd_name} active, rescheduling") - ioloop = IOLoop.current() - self.fetch_input_recd = False - self.fetch_timeout_handle = ioloop.call_later( - GIT_ASYNC_TIMEOUT, self._check_process_active, # type: ignore - scmd, cmd_name) - else: - # Request has timed out with no input, terminate it - logging.debug(f"Git Repo {self.alias}: {cmd_name} timed out") - # Cancel with SIGKILL - await scmd.cancel(2) - - async def _run_git_cmd(self, - git_args: str, - timeout: float = 20., - retries: int = 5, - env: Optional[Dict[str, str]] = None - ) -> str: - try: - return await self.cmd_helper.run_cmd_with_response( - f"git -C {self.git_path} {git_args}", - timeout=timeout, retries=retries, env=env, sig_idx=2) - except self.cmd_helper.scmd_error as e: - stdout = e.stdout.decode().strip() - stderr = e.stderr.decode().strip() - if stdout: - self.git_messages.append(stdout) - if stderr: - self.git_messages.append(stderr) - raise - -class PackageUpdater(BaseUpdater): +class PackageDeploy(BaseDeploy): def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper @@ -1460,7 +622,7 @@ class PackageUpdater(BaseUpdater): 'package_list': self.available_packages } -class WebUpdater(BaseUpdater): +class WebClientDeploy(BaseDeploy): def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper @@ -1468,7 +630,6 @@ class WebUpdater(BaseUpdater): super().__init__(config, cmd_helper) self.repo = config.get('repo').strip().strip("/") self.owner = self.repo.split("/", 1)[0] - self.name = config.get_name().split()[-1] self.path: str = os.path.realpath(os.path.expanduser( config.get("path"))) self.persistent_files: List[str] = []