# Git Deployment implementation # # Copyright (C) 2021 Eric Callahan # # This file may be distributed under the terms of the GNU GPLv3 license. from __future__ import annotations import asyncio import os import pathlib import shutil import re import logging from .app_deploy import AppDeploy from .common import Channel from ...utils.versions import GitVersion # Annotation imports from typing import ( TYPE_CHECKING, Any, Optional, Dict, List, ) if TYPE_CHECKING: from ...confighelper import ConfigHelper from ..shell_command import ShellCommand from ..machine import Machine from .update_manager import CommandHelper from ..http_client import HttpClient class GitDeploy(AppDeploy): def __init__(self, config: ConfigHelper, cmd_helper: CommandHelper) -> None: super().__init__(config, cmd_helper, "Git Repo") self._configure_path(config) self._configure_virtualenv(config) self._configure_dependencies(config) self.origin: str = config.get('origin') self.moved_origin: Optional[str] = config.get('moved_origin', None) self.primary_branch = config.get("primary_branch", "master") self.repo = GitRepo( cmd_helper, self.path, self.name, self.origin, self.moved_origin, self.primary_branch, self.channel ) async def initialize(self) -> Dict[str, Any]: storage = await super().initialize() await self.repo.restore_state(storage) self._is_valid = self.repo.is_valid() if not self.needs_refresh(): self.repo.log_repo_info() return storage async def refresh(self) -> None: try: await self._update_repo_state() except Exception: logging.exception("Error Refreshing git state") async def _update_repo_state(self, need_fetch: bool = True) -> None: self._is_valid = False await self.repo.refresh_repo_state(need_fetch=need_fetch) self.log_info(f"Channel: {self.channel}") self._is_valid = self.repo.is_valid() if not self._is_valid: self.log_info("Repo validation check failed, updates disabled") else: self.log_info("Validity check for git repo passed") self._save_state() async def update(self) -> bool: await self.repo.wait_for_init() if not self._is_valid: raise self.log_exc("Update aborted, repo not valid", False) if self.repo.is_dirty(): raise self.log_exc( "Update aborted, repo has been modified", False) if self.repo.is_current(): # No need to update return False self.cmd_helper.notify_update_response( f"Updating Application {self.name}...") dep_info = await self._collect_dependency_info() await self._pull_repo() # Check Semantic Versions await self._update_dependencies(dep_info) # Refresh local repo state await self._update_repo_state(need_fetch=False) await self.restart_service() self.notify_status("Update Finished...", is_complete=True) return True async def recover(self, hard: bool = False, force_dep_update: bool = False ) -> None: self.notify_status("Attempting Repo Recovery...") dep_info = await self._collect_dependency_info() if hard: if self.repo.is_submodule_or_worktree(): raise self.server.error( f"Cannot re-clone git repo {self.name}, it is either " f"a submodule or worktree." ) await self.repo.clone() if self.channel != Channel.DEV: if self.repo.upstream_commit != "?": # If on beta or stable reset to the latest tagged # upstream commit await self.repo.reset() else: self.notify_status( f"No upstream commit for repo on {self.channel} channel, " "skipping reset." ) await self._update_repo_state() else: self.notify_status("Resetting Git Repo...") await self.repo.reset() await self._update_repo_state() self.repo.set_rollback_state(None) if self.repo.is_dirty() or not self._is_valid: raise self.server.error( "Recovery attempt failed, repo state not pristine", 500) await self._update_dependencies(dep_info, force=force_dep_update) await self.restart_service() self.notify_status("Reinstall Complete", is_complete=True) async def rollback(self) -> bool: dep_info = await self._collect_dependency_info() ret = await self.repo.rollback() if ret: await self._update_dependencies(dep_info) await self._update_repo_state(need_fetch=False) await self.restart_service() msg = "Rollback Complete" else: msg = "Rollback not performed" self.notify_status(msg, is_complete=True) return ret def get_update_status(self) -> Dict[str, Any]: status = super().get_update_status() status.update(self.repo.get_repo_status()) return status def get_persistent_data(self) -> Dict[str, Any]: storage = super().get_persistent_data() storage.update(self.repo.get_persistent_data()) return storage async def _pull_repo(self) -> None: self.notify_status("Updating Repo...") rb_state = self.repo.capture_state_for_rollback() try: await self.repo.fetch() if self.repo.is_detached(): await self.repo.checkout() elif await self.repo.check_diverged(): self.notify_status( "Repo has diverged, attempting git reset" ) await self.repo.reset() else: await self.repo.pull() except Exception as e: if self.repo.repo_corrupt: self._is_valid = False self._save_state() event_loop = self.server.get_event_loop() event_loop.delay_callback( .2, self.cmd_helper.notify_update_refreshed ) raise self.log_exc(str(e)) else: self.repo.set_rollback_state(rb_state) async def _collect_dependency_info(self) -> Dict[str, Any]: pkg_deps = await self._read_system_dependencies() pyreqs = await self._read_python_reqs() npm_hash = await self._get_file_hash(self.npm_pkg_json) logging.debug( f"\nApplication {self.name}: Pre-update dependencies:\n" f"Packages: {pkg_deps}\n" f"Python Requirements: {pyreqs}" ) return { "system_packages": pkg_deps, "python_modules": pyreqs, "npm_hash": npm_hash } async def _update_dependencies( self, dep_info: Dict[str, Any], force: bool = False ) -> None: packages = await self._read_system_dependencies() modules = await self._read_python_reqs() logging.debug( f"\nApplication {self.name}: Post-update dependencies:\n" f"Packages: {packages}\n" f"Python Requirements: {modules}" ) if not force: packages = list(set(packages) - set(dep_info["system_packages"])) modules = list(set(modules) - set(dep_info["python_modules"])) logging.debug( f"\nApplication {self.name}: Dependencies to install:\n" f"Packages: {packages}\n" f"Python Requirements: {modules}\n" f"Force All: {force}" ) if packages: await self._install_packages(packages) if modules: await self._update_python_requirements(self.python_reqs or modules) npm_hash: Optional[str] = dep_info["npm_hash"] ret = await self._check_need_update(npm_hash, self.npm_pkg_json) if force or ret: if self.npm_pkg_json is not None: self.notify_status("Updating Node Packages...") try: await self.cmd_helper.run_cmd( "npm ci --only=prod", notify=True, timeout=600., cwd=str(self.path)) except Exception: self.notify_status("Node Package Update failed") async def close(self) -> None: await self.repo.unset_current_instance() GIT_ASYNC_TIMEOUT = 300. GIT_ENV_VARS = { 'GIT_HTTP_LOW_SPEED_LIMIT': "1000", 'GIT_HTTP_LOW_SPEED_TIME ': "20" } GIT_MAX_LOG_CNT = 100 GIT_LOG_FMT = ( "\"sha:%H%x1Dauthor:%an%x1Ddate:%ct%x1Dsubject:%s%x1Dmessage:%b%x1E\"" ) GIT_REF_FMT = ( "'%(if)%(*objecttype)%(then)%(*objecttype) %(*objectname)" "%(else)%(objecttype) %(objectname)%(end) %(refname)'" ) class GitRepo: def __init__( self, cmd_helper: CommandHelper, src_path: pathlib.Path, alias: str, origin_url: str, moved_origin_url: Optional[str], primary_branch: str, channel: Channel ) -> None: self.server = cmd_helper.get_server() self.cmd_helper = cmd_helper self.alias = alias self.src_path = src_path git_dir = src_path.parent git_base = src_path.name self.backup_path = git_dir.joinpath(f".{git_base}_repo_backup") self.git_folder_path = src_path.joinpath(".git") self.origin_url = origin_url if not self.origin_url.endswith(".git"): self.origin_url += ".git" self.moved_origin_url = moved_origin_url self.primary_branch = primary_branch self.recovery_message = \ f""" Manually restore via SSH with the following commands: sudo service {self.alias} stop cd {git_dir} rm -rf {git_base} git clone {self.origin_url} sudo service {self.alias} start """ self.repo_warnings: List[str] = [] self.managing_instances: List[str] = [] self.init_evt: Optional[asyncio.Event] = None self.initialized: bool = False self.git_operation_lock = asyncio.Lock() self.fetch_timeout_handle: Optional[asyncio.Handle] = None self.fetch_input_recd: bool = False self.channel = channel self.is_shallow = False async def restore_state(self, storage: Dict[str, Any]) -> None: self.valid_git_repo: bool = storage.get('repo_valid', False) self.git_owner: str = storage.get('git_owner', "?") self.git_repo_name: str = storage.get('git_repo_name', "?") self.git_remote: str = storage.get('git_remote', "?") self.git_branch: str = storage.get('git_branch', "?") if "full_version_string" in storage: self.current_version = GitVersion(storage["full_version_string"]) else: self.current_version = GitVersion(storage.get('current_version', "?")) self.upstream_version = GitVersion(storage.get('upstream_version', "?")) self.current_commit: str = storage.get('current_commit', "?") self.upstream_commit: str = storage.get('upstream_commit', "?") self.upstream_url: str = storage.get('upstream_url', "?") self.recovery_url: str = storage.get( 'recovery_url', self.upstream_url if self.git_remote == "origin" else "?" ) self.branches: List[str] = storage.get('branches', []) self.head_detached: bool = storage.get('head_detached', False) self.git_messages: List[str] = storage.get('git_messages', []) self.commits_behind: List[Dict[str, Any]] = storage.get('commits_behind', []) self.commits_behind_count: int = storage.get('cbh_count', 0) self.diverged: bool = storage.get("diverged", False) self.repo_corrupt: bool = storage.get('corrupt', False) def_rbs = self.capture_state_for_rollback() self.rollback_commit: str = storage.get('rollback_commit', self.current_commit) self.rollback_branch: str = storage.get('rollback_branch', def_rbs["branch"]) rbv = storage.get('rollback_version', self.current_version) self.rollback_version = GitVersion(str(rbv)) if not await self._detect_git_dir(): self.valid_git_repo = False if self.valid_git_repo: await self.set_current_instance() self._check_warnings() def get_persistent_data(self) -> Dict[str, Any]: return { 'repo_valid': self.valid_git_repo, 'git_owner': self.git_owner, 'git_repo_name': self.git_repo_name, 'git_remote': self.git_remote, 'git_branch': self.git_branch, 'current_version': self.current_version.full_version, 'upstream_version': self.upstream_version.full_version, 'current_commit': self.current_commit, 'upstream_commit': self.upstream_commit, 'rollback_commit': self.rollback_commit, 'rollback_branch': self.rollback_branch, 'rollback_version': self.rollback_version.full_version, 'upstream_url': self.upstream_url, 'recovery_url': self.recovery_url, 'branches': self.branches, 'head_detached': self.head_detached, 'git_messages': self.git_messages, 'commits_behind': self.commits_behind, 'cbh_count': self.commits_behind_count, 'diverged': self.diverged, 'corrupt': self.repo_corrupt } async def refresh_repo_state(self, need_fetch: bool = True) -> None: if self.init_evt is not None: # No need to initialize multiple requests await self.init_evt.wait() if self.initialized: return self.initialized = False self.init_evt = asyncio.Event() self.git_messages.clear() try: await self._check_repo_status() self._verify_repo() await self._find_current_branch() # Fetch the upstream url. If the repo has been moved, # set the new url self.upstream_url = await self.remote(f"get-url {self.git_remote}") if await self._check_moved_origin(): need_fetch = True if self.git_remote == "origin": self.recovery_url = self.upstream_url else: remote_list = (await self.remote("")).splitlines() logging.debug( f"Git Repo {self.alias}: Detected Remotes - {remote_list}" ) if "origin" in remote_list: self.recovery_url = await self.remote("get-url origin") else: logging.info( f"Git Repo {self.alias}: Unable to detect recovery URL, " "Hard Recovery not available" ) self.recovery_url = "?" if need_fetch: await self.fetch() self.diverged = await self.check_diverged() # Parse GitHub Owner from URL owner_match = re.match(r"https?://[^/]+/([^/]+)", self.upstream_url) self.git_owner = "?" if owner_match is not None: self.git_owner = owner_match.group(1) # Parse GitHub Repository Name from URL repo_match = re.match(r".*\/([^\.]*).*", self.upstream_url) self.git_repo_name = "?" if repo_match is not None: self.git_repo_name = repo_match.group(1) self.current_commit = await self.rev_parse("HEAD") git_desc = await self.describe("--always --tags --long --dirty --abbrev=8") cur_ver = GitVersion(git_desc.strip()) upstream_ver = await self._get_upstream_version() await self._set_versions(cur_ver, upstream_ver) # Get Commits Behind self.commits_behind = [] if self.commits_behind_count > 0: cbh = await self.get_commits_behind() tagged_commits = await self.get_tagged_commits() debug_msg = '\n'.join([f"{k}: {v}" for k, v in tagged_commits.items()]) logging.debug(f"Git Repo {self.alias}: Tagged Commits\n{debug_msg}") for i, commit in enumerate(cbh): tag = tagged_commits.get(commit['sha'], None) if i < 30 or tag is not None: commit['tag'] = tag self.commits_behind.append(commit) self._check_warnings() except Exception: logging.exception(f"Git Repo {self.alias}: Initialization failure") raise else: self.initialized = True # If no exception was raised assume the repo is not corrupt self.repo_corrupt = False if self.rollback_commit == "?" or self.rollback_branch == "?": # Reset Rollback State self.set_rollback_state(None) self.log_repo_info() finally: self.init_evt.set() self.init_evt = None async def _check_repo_status(self) -> bool: async with self.git_operation_lock: self.valid_git_repo = False if not await self._detect_git_dir(): logging.info( f"Git Repo {self.alias}: path '{self.src_path}'" " is not a valid git repo") return False await self._wait_for_lock_release() retries = 3 while retries: self.git_messages.clear() try: cmd = "status --porcelain -b" resp: Optional[str] = await self._run_git_cmd(cmd, retries=1) except Exception: retries -= 1 resp = None # Attempt to recover from "loose object" error if retries and self.repo_corrupt: if not await self._repair_loose_objects(): # Since we are unable to recover, immediately # return return False else: break if resp is None: return False self.valid_git_repo = True await self.set_current_instance() return True async def _detect_git_dir(self) -> bool: if self.git_folder_path.is_file(): # Submodules have a file that contain the path to # the .git folder eventloop = self.server.get_event_loop() data = await eventloop.run_in_thread(self.git_folder_path.read_text) ident, _, gitdir = data.partition(":") if ident.strip() != "gitdir" or not gitdir.strip(): return False self.git_folder_path = pathlib.Path(gitdir).expanduser().resolve() if self.git_folder_path.is_dir(): self.is_shallow = self.git_folder_path.joinpath("shallow").is_file() return True return False async def _find_current_branch(self) -> None: # Populate list of current branches blist = await self.list_branches() current_branch = "" self.branches = [] for branch in blist: branch = branch.strip() if not branch: continue if branch[0] == "*": branch = branch[2:].strip() current_branch = branch if branch[0] == "(": continue self.branches.append(branch) if current_branch.startswith("(HEAD detached"): self.head_detached = True ref_name = current_branch.split()[-1][:-1] remote_list = (await self.remote("")).splitlines() for remote in remote_list: remote = remote.strip() if not remote: continue if ref_name.startswith(remote): self.git_branch = ref_name[len(remote)+1:] self.git_remote = remote break else: if self.git_remote == "?": msg = "Resolve by manually checking out a branch via SSH." else: prev = f"{self.git_remote}/{self.git_branch}" msg = f"Defaulting to previously tracked {prev}." logging.info(f"Git Repo {self.alias}: {current_branch} {msg}") else: self.head_detached = False self.git_branch = current_branch rkey = f"branch.{self.git_branch}.remote" self.git_remote = (await self.config_get(rkey)) or "?" async def _check_moved_origin(self) -> bool: detected_origin = self.upstream_url.lower().strip() if not detected_origin.endswith(".git"): detected_origin += ".git" if ( self.server.is_debug_enabled() or not detected_origin.startswith("http") or detected_origin == self.origin_url.lower() ): # Skip the moved origin check if: # Repo Debug is enabled # The detected origin url is not http(s) # The detected origin matches the expected origin url return False moved = False client: HttpClient = self.server.lookup_component("http_client") check_url = detected_origin[:-4] logging.info( f"Git repo {self.alias}: Performing moved origin check - " f"{check_url}" ) resp = await client.get(check_url, enable_cache=False) if not resp.has_error(): final_url = resp.final_url.lower() if not final_url.endswith(".git"): final_url += ".git" logging.debug(f"Git repo {self.alias}: Resolved url - {final_url}") if final_url == self.origin_url.lower(): logging.info( f"Git Repo {self.alias}: Moved Repo Detected, Moving " f"from {self.upstream_url} to {self.origin_url}") moved = True await self.remote( f"set-url {self.git_remote} {self.origin_url}") self.upstream_url = self.origin_url if self.moved_origin_url is not None: moved_origin = self.moved_origin_url.lower().strip() if not moved_origin.endswith(".git"): moved_origin += ".git" if moved_origin != detected_origin: self.server.add_warning( f"Git Repo {self.alias}: Origin URL does not " "not match configured 'moved_origin'option. " f"Expected: {detected_origin}" ) else: logging.debug(f"Move Request Failed: {resp.error}") return moved async def _get_upstream_version(self) -> GitVersion: self.commits_behind_count = 0 if self.channel == Channel.DEV: self.upstream_commit = await self.rev_parse( f"{self.git_remote}/{self.git_branch}" ) upstream_ver_str = await self.describe( f"{self.git_remote}/{self.git_branch} --always --tags --long --abbrev=8" ) else: tagged_commits = await self.get_tagged_commits() upstream_commit = upstream_ver_str = "?" for sha, tag in tagged_commits.items(): ver = GitVersion(tag) if not ver.is_valid_version(): continue if ( (self.channel == Channel.STABLE and ver.is_final_release()) or (self.channel == Channel.BETA and not ver.is_alpha_release()) ): upstream_commit = sha upstream_ver_str = tag break self.upstream_commit = upstream_commit if self.upstream_commit != "?": rl_args = f"HEAD..{self.upstream_commit} --count" self.commits_behind_count = int(await self.rev_list(rl_args)) return GitVersion(upstream_ver_str) async def _set_versions( self, current_version: GitVersion, upstream_version: GitVersion ) -> None: if not current_version.is_valid_version(): log_msg = ( f"Git repo {self.alias}: Failed to detect current version, got " f"'{current_version}'. " ) tag = upstream_version.infer_last_tag() count = await self.rev_list("HEAD --count") sha_part = "" if current_version.is_fallback(): sha_part = f"-g{current_version}" elif self.current_commit not in ("?", ""): sha_part = f"-g{self.current_commit[:8]}" current_version = GitVersion(f"{tag}-{count}{sha_part}-inferred") log_msg += f"Falling back to inferred version: {current_version}" logging.info(log_msg) if self.channel == Channel.DEV: if not upstream_version.is_valid_version(): log_msg = ( f"Git repo {self.alias}: Failed to detect upstream version, got " f"'{upstream_version}'. " ) tag = current_version.tag if current_version.inferred: count = await self.rev_list(f"{self.upstream_commit} --count") else: log_msg += "\nRemote has diverged, approximating dev count. " count = str(self.commits_behind_count + current_version.dev_count) upstream_version = GitVersion(f"{tag}-{count}-inferred") log_msg += f"Falling back to inferred version: {upstream_version}" logging.info(log_msg) else: if not upstream_version.is_valid_version(): self.upstream_commit = self.current_commit upstream_version = current_version elif upstream_version <= current_version: self.upstream_commit = self.current_commit self.current_version = current_version self.upstream_version = upstream_version async def wait_for_init(self) -> None: if self.init_evt is not None: await self.init_evt.wait() if not self.initialized: raise self.server.error( f"Git Repo {self.alias}: Initialization failure") async def check_diverged(self) -> bool: self._verify_repo(check_remote=True) if self.head_detached: return False async with self.git_operation_lock: cmd = f"merge-base --is-ancestor HEAD {self.git_remote}/{self.git_branch}" for _ in range(3): try: await self._run_git_cmd(cmd, retries=1, corrupt_msg="error: ") except self.cmd_helper.scmd_error as err: if err.return_code == 1: return True if self.repo_corrupt: raise else: break await asyncio.sleep(.5) return False def log_repo_info(self) -> None: warnings = "" if self.repo_warnings: warnings = "\nRepo Warnings:\n" warnings += '\n'.join([f" {warn}" for warn in self.repo_warnings]) logging.info( f"Git Repo {self.alias} Detected:\n" f"Owner: {self.git_owner}\n" f"Repository Name: {self.git_repo_name}\n" f"Path: {self.src_path}\n" f"Remote: {self.git_remote}\n" f"Branch: {self.git_branch}\n" f"Remote URL: {self.upstream_url}\n" f"Recovery URL: {self.recovery_url}\n" f"Current Commit SHA: {self.current_commit}\n" f"Upstream Commit SHA: {self.upstream_commit}\n" f"Current Version: {self.current_version}\n" f"Upstream Version: {self.upstream_version}\n" f"Rollback Commit: {self.rollback_commit}\n" f"Rollback Branch: {self.rollback_branch}\n" f"Rollback Version: {self.rollback_version}\n" f"Is Dirty: {self.current_version.dirty}\n" f"Is Detached: {self.head_detached}\n" f"Is Shallow: {self.is_shallow}\n" f"Commits Behind Count: {self.commits_behind_count}\n" f"Diverged: {self.diverged}" f"{warnings}" ) def _check_warnings(self) -> None: if self.upstream_url == "?": self.repo_warnings.append("Failed to detect repo url") return self.repo_warnings.clear() upstream_url = self.upstream_url.lower() if upstream_url[-4:] != ".git": upstream_url += ".git" if upstream_url != self.origin_url.lower(): self.repo_warnings.append(f"Unofficial remote url: {self.upstream_url}") if self.git_branch != self.primary_branch or self.git_remote != "origin": self.repo_warnings.append( "Repo not on offical remote/branch, expected: " f"origin/{self.primary_branch}, detected: " f"{self.git_remote}/{self.git_branch}") if self.head_detached: self.repo_warnings.append("Detached HEAD detected") if self.diverged: self.repo_warnings.append("Repo has diverged from remote") if self.is_dirty(): self.repo_warnings.append("Repo has modified files (dirty)") if len(self.managing_instances) > 1: instances = "\n".join([f" {ins}" for ins in self.managing_instances]) self.repo_warnings.append( f"Multiple instances of Moonraker managing this repo:\n" f"{instances}" ) ro_msg = f"Git Repo {self.alias}: No warnings detected" if self.repo_warnings: ro_msg = f"Git Repo {self.alias} Warnings Detected:\n" ro_msg += "\n".join(self.repo_warnings) self.server.add_log_rollover_item(f"umgr_{self.alias}_warn", ro_msg, log=False) def _verify_repo(self, check_remote: bool = False) -> None: if not self.valid_git_repo: raise self.server.error( f"Git Repo {self.alias}: repo not initialized") if check_remote: if self.git_remote == "?": raise self.server.error( f"Git Repo {self.alias}: No valid git remote detected") async def reset(self, ref: Optional[str] = None) -> None: async with self.git_operation_lock: if ref is None: if self.channel != Channel.DEV: ref = self.upstream_commit else: if self.git_remote == "?" or self.git_branch == "?": raise self.server.error("Cannot reset, unknown remote/branch") ref = f"{self.git_remote}/{self.git_branch}" await self._run_git_cmd(f"reset --hard {ref}", retries=2) self.repo_corrupt = False async def fetch(self) -> None: self._verify_repo(check_remote=True) async with self.git_operation_lock: await self._run_git_cmd_async( f"fetch {self.git_remote} --prune --progress") async def clean(self) -> None: self._verify_repo() async with self.git_operation_lock: await self._run_git_cmd("clean -d -f", retries=2) async def pull(self) -> None: self._verify_repo() if self.head_detached: raise self.server.error( f"Git Repo {self.alias}: Cannot perform pull on a " "detached HEAD") cmd = "pull --progress" if self.server.is_debug_enabled(): cmd = f"{cmd} --rebase" if self.channel != Channel.DEV: cmd = f"{cmd} {self.git_remote} {self.upstream_commit}" async with self.git_operation_lock: await self._run_git_cmd_async(cmd) async def list_branches(self) -> List[str]: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd("branch --list --no-color") return resp.strip().split("\n") async def remote(self, command: str) -> str: self._verify_repo(check_remote=True) async with self.git_operation_lock: resp = await self._run_git_cmd( f"remote {command}") return resp.strip() async def describe(self, args: str = "") -> str: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd(f"describe {args}".strip()) return resp.strip() async def rev_parse(self, args: str = "") -> str: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd(f"rev-parse {args}".strip()) return resp.strip() async def rev_list(self, args: str = "") -> str: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd(f"rev-list {args}".strip()) return resp.strip() async def config_get( self, key: str, pattern: str = "", get_all: bool = False, local_only: bool = False ) -> Optional[str]: local = "--local " if local_only else "" cmd = f"{local}--get-all" if get_all else f"{local}--get" args = f"{cmd} {key} '{pattern}'" if pattern else f"{cmd} {key}" try: return await self.config_cmd(args) except self.cmd_helper.scmd_error as e: if e.return_code == 1: return None raise async def config_set(self, key: str, value: str) -> None: await self.config_cmd(f"{key} '{value}'") async def config_add(self, key: str, value: str) -> None: await self.config_cmd(f"--add {key} '{value}'") async def config_unset( self, key: str, pattern: str = "", unset_all: bool = False ) -> None: cmd = "--unset-all" if unset_all else "--unset" args = f"{cmd} {key} '{pattern}'" if pattern else f"{cmd} {key}" await self.config_cmd(args) async def config_cmd(self, args: str) -> str: self._verify_repo() verbose = self.server.is_verbose_enabled() async with self.git_operation_lock: for attempt in range(3): try: return await self._run_git_cmd( f"config {args}", retries=1, log_complete=verbose ) except self.cmd_helper.scmd_error as e: if 1 <= (e.return_code or 10) <= 6 or attempt == 2: raise raise self.server.error("Failed to run git-config") async def checkout(self, branch: Optional[str] = None) -> None: self._verify_repo() reset_commit: Optional[str] = None async with self.git_operation_lock: if branch is None: # No branch is specifed so we are checking out detached if self.channel != Channel.DEV: reset_commit = self.upstream_commit branch = f"{self.git_remote}/{self.git_branch}" await self._run_git_cmd(f"checkout -q {branch}") if reset_commit is not None: await self.reset(reset_commit) async def run_fsck(self) -> None: async with self.git_operation_lock: await self._run_git_cmd("fsck --full", timeout=300., retries=1) async def clone(self) -> None: async with self.git_operation_lock: if self.recovery_url == "?": raise self.server.error( "Recovery url has not been detected, clone aborted" ) self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Starting Clone Recovery...") event_loop = self.server.get_event_loop() if self.backup_path.exists(): await event_loop.run_in_thread(shutil.rmtree, self.backup_path) await self._check_lock_file_exists(remove=True) cmd = f"clone --filter=blob:none {self.recovery_url} {self.backup_path}" try: await self._run_git_cmd_async(cmd, 1, False, False) except Exception as e: self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Git Clone Failed") raise self.server.error("Git Clone Error") from e if self.src_path.exists(): await event_loop.run_in_thread(shutil.rmtree, self.src_path) await event_loop.run_in_thread( shutil.move, str(self.backup_path), str(self.src_path)) self.repo_corrupt = False self.cmd_helper.notify_update_response( f"Git Repo {self.alias}: Git Clone Complete") async def rollback(self) -> bool: if self.rollback_commit == "?" or self.rollback_branch == "?": raise self.server.error("Incomplete rollback data stored, cannot rollback") if self.rollback_branch != self.git_branch: await self.checkout(self.rollback_branch) elif self.rollback_commit == self.current_commit: return False await self.reset(self.rollback_commit) return True def capture_state_for_rollback(self) -> Dict[str, Any]: branch = self.git_branch if self.head_detached: valid = "?" not in (self.git_remote, self.git_branch) branch = f"{self.git_remote}/{self.git_branch}" if valid else "?" return { "commit": self.current_commit, "branch": branch, "version": self.current_version } def set_rollback_state(self, rb_state: Optional[Dict[str, Any]]) -> None: if rb_state is None: rb_state = self.capture_state_for_rollback() self.rollback_commit = rb_state["commit"] self.rollback_branch = rb_state["branch"] self.rollback_version = rb_state["version"] async def get_commits_behind(self) -> List[Dict[str, Any]]: self._verify_repo() if self.is_current(): return [] async with self.git_operation_lock: if self.channel != Channel.DEV: ref = self.upstream_commit else: ref = f"{self.git_remote}/{self.git_branch}" resp = await self._run_git_cmd( f"log {self.current_commit}..{ref} " f"--format={GIT_LOG_FMT} --max-count={GIT_MAX_LOG_CNT}") commits_behind: List[Dict[str, Any]] = [] for log_entry in resp.split('\x1E'): log_entry = log_entry.strip() if not log_entry: continue log_items = [li.strip() for li in log_entry.split('\x1D') if li.strip()] cbh = [li.split(':', 1) for li in log_items] commits_behind.append(dict(cbh)) # type: ignore return commits_behind async def get_tagged_commits(self, count: int = 100) -> Dict[str, str]: self._verify_repo() async with self.git_operation_lock: resp = await self._run_git_cmd( f"for-each-ref --count={count} --sort='-creatordate' " f"--contains=HEAD --format={GIT_REF_FMT} 'refs/tags'" ) tagged_commits: Dict[str, str] = {} for line in resp.split('\n'): parts = line.strip().split() if len(parts) != 3 or parts[0] != "commit": continue sha, ref = parts[1:] tag = ref.split('/')[-1] tagged_commits[sha] = tag # Return tagged commits as SHA keys mapped to tag values return tagged_commits async def set_current_instance(self) -> None: # Check to see if multiple instances of Moonraker are configured # to manage this repo full_id = self._get_instance_id() self.managing_instances.clear() try: instances = await self.config_get( "moonraker.instance", get_all=True, local_only=True ) if instances is None: await self.config_set("moonraker.instance", full_id) self.managing_instances = [full_id] else: det_instances = [ ins.strip() for ins in instances.split("\n") if ins.strip() ] if full_id not in det_instances: await self.config_add("moonraker.instance", full_id) det_instances.append(full_id) self.managing_instances = det_instances except asyncio.CancelledError: raise except Exception as e: logging.info( f"Git Repo {self.alias}: Moonraker Instance Validation Error, {e}" ) async def unset_current_instance(self) -> None: full_id = self._get_instance_id() if full_id not in self.managing_instances: return try: await self.config_unset("moonraker.instance", pattern=full_id) except asyncio.CancelledError: raise except Exception as e: logging.info(f"Git repo {self.alias}: Error removing instance, {e}") def _get_instance_id(self) -> str: machine: Machine = self.server.lookup_component("machine") cur_name = machine.unit_name cur_uuid: str = self.server.get_app_args()["instance_uuid"] return f"{cur_name}@{cur_uuid}" def get_repo_status(self) -> Dict[str, Any]: return { 'detected_type': "git_repo", 'remote_alias': self.git_remote, 'branch': self.git_branch, 'owner': self.git_owner, 'repo_name': self.git_repo_name, 'remote_url': self.upstream_url, 'recovery_url': self.recovery_url, 'version': self.current_version.short_version, 'remote_version': self.upstream_version.short_version, 'rollback_version': self.rollback_version.short_version, 'current_hash': self.current_commit, 'remote_hash': self.upstream_commit, 'is_dirty': self.current_version.dirty, 'detached': self.head_detached, 'commits_behind': self.commits_behind, 'commits_behind_count': self.commits_behind_count, 'git_messages': self.git_messages, 'full_version_string': self.current_version.full_version, 'pristine': not self.current_version.dirty, 'corrupt': self.repo_corrupt, 'warnings': self.repo_warnings } def get_version(self, upstream: bool = False) -> GitVersion: return self.upstream_version if upstream else self.current_version def is_detached(self) -> bool: return self.head_detached def is_dirty(self) -> bool: return self.current_version.dirty def is_current(self) -> bool: return self.current_commit == self.upstream_commit def is_submodule_or_worktree(self): return ( self.src_path.joinpath(".git").is_file() and self.git_folder_path.parent.name in ("modules", "worktrees") ) def is_valid(self) -> bool: return ( not self.is_damaged() and not self.has_recoverable_errors() ) def is_damaged(self) -> bool: # A damaged repo requires a clone to recover return not self.valid_git_repo or self.repo_corrupt def has_recoverable_errors(self) -> bool: # These errors should be recoverable using a git reset detached_err = False if self.server.is_debug_enabled() else self.head_detached return ( self.diverged or self.is_dirty() or detached_err ) async def _check_lock_file_exists(self, remove: bool = False) -> bool: lock_path = self.git_folder_path.joinpath("index.lock") if lock_path.is_file(): if remove: logging.info(f"Git Repo {self.alias}: Git lock file found " "after git process exited, removing") try: event_loop = self.server.get_event_loop() await event_loop.run_in_thread(os.remove, lock_path) except Exception: pass return True return False async def _wait_for_lock_release(self, timeout: int = 60) -> None: while timeout: if await self._check_lock_file_exists(): if not timeout % 10: logging.info(f"Git Repo {self.alias}: Git lock file " f"exists, {timeout} seconds remaining " "before removal.") await asyncio.sleep(1.) timeout -= 1 else: return await self._check_lock_file_exists(remove=True) async def _repair_loose_objects(self, notify: bool = False) -> bool: if notify: self.cmd_helper.notify_update_response( "Attempting to repair loose objects..." ) try: await self.cmd_helper.run_cmd_with_response( "find .git/objects/ -type f -empty | xargs rm", timeout=10., retries=1, cwd=str(self.src_path)) await self._run_git_cmd_async( "fetch --all -p", retries=1, fix_loose=False) await self._run_git_cmd("fsck --full", timeout=300., retries=1) except Exception: msg = ( "Attempt to repair loose objects failed, " "hard recovery is required" ) logging.exception(msg) if notify: self.cmd_helper.notify_update_response(msg) return False if notify: self.cmd_helper.notify_update_response("Loose objects repaired") self.repo_corrupt = False return True async def _run_git_cmd_async(self, cmd: str, retries: int = 5, need_git_path: bool = True, fix_loose: bool = True ) -> None: # Fetch and pull require special handling. If the request # gets delayed we do not want to terminate it while the command # is processing. await self._wait_for_lock_release() event_loop = self.server.get_event_loop() env = os.environ.copy() env.update(GIT_ENV_VARS) if need_git_path: git_cmd = f"git -C {self.src_path} {cmd}" else: git_cmd = f"git {cmd}" scmd = self.cmd_helper.build_shell_command( git_cmd, callback=self._handle_process_output, env=env) while retries: self.git_messages.clear() self.fetch_input_recd = False self.fetch_timeout_handle = event_loop.delay_callback( GIT_ASYNC_TIMEOUT, self._check_process_active, scmd, cmd) try: await scmd.run(timeout=0) except Exception: pass self.fetch_timeout_handle.cancel() ret = scmd.get_return_code() if ret == 0: self.git_messages.clear() return elif self.repo_corrupt and fix_loose: if await self._repair_loose_objects(notify=True): # Only attempt to repair loose objects once. Re-run # the command once. fix_loose = False retries = 2 else: # since the attept to repair failed, bypass retries # and immediately raise an exception raise self.server.error( f"Unable to repair loose objects, use hard recovery") retries -= 1 await asyncio.sleep(.5) await self._check_lock_file_exists(remove=True) raise self.server.error(f"Git Command '{cmd}' failed") def _handle_process_output(self, output: bytes) -> None: self.fetch_input_recd = True out = output.decode().strip() if out: if out.startswith("fatal: "): self.repo_corrupt = True self.git_messages.append(out) self.cmd_helper.notify_update_response(out) logging.debug( f"Git Repo {self.alias}: {out}") async def _check_process_active( self, scmd: ShellCommand, cmd_name: str ) -> None: ret = scmd.get_return_code() if ret is not None: logging.debug(f"Git Repo {self.alias}: {cmd_name} returned") return if self.fetch_input_recd: # Received some input, reschedule timeout logging.debug( f"Git Repo {self.alias}: {cmd_name} active, rescheduling") event_loop = self.server.get_event_loop() self.fetch_input_recd = False self.fetch_timeout_handle = event_loop.delay_callback( GIT_ASYNC_TIMEOUT, self._check_process_active, scmd, cmd_name) else: # Request has timed out with no input, terminate it logging.debug(f"Git Repo {self.alias}: {cmd_name} timed out") # Cancel with SIGKILL await scmd.cancel(2) async def _run_git_cmd( self, git_args: str, timeout: float = 20., retries: int = 5, env: Optional[Dict[str, str]] = None, corrupt_msg: str = "fatal: ", log_complete: bool = True ) -> str: try: return await self.cmd_helper.run_cmd_with_response( f"git -C {self.src_path} {git_args}", timeout=timeout, retries=retries, env=env, sig_idx=2, log_complete=log_complete ) except self.cmd_helper.scmd_error as e: stdout = e.stdout.decode().strip() stderr = e.stderr.decode().strip() msg_lines: List[str] = [] if stdout: msg_lines.extend(stdout.split("\n")) self.git_messages.append(stdout) if stderr: msg_lines.extend(stdout.split("\n")) self.git_messages.append(stderr) for line in msg_lines: line = line.strip().lower() if line.startswith(corrupt_msg): self.repo_corrupt = True break raise